rogerxavier's picture
Upload 258 files
0aee47a verified
raw
history blame
87.2 kB
"""
bilibili_api.video
视频相关操作
注意,同时存在 page_index 和 cid 的参数,两者至少提供一个。
"""
import re
import os
import json
import struct
import asyncio
import logging
import datetime
from enum import Enum
from inspect import isfunction
from functools import cmp_to_key
from dataclasses import dataclass
from typing import Any, List, Union, Optional
import httpx
import aiohttp
from . import settings
from .utils.aid_bvid_transformer import bvid2aid, aid2bvid
from .utils.utils import get_api
from .utils.AsyncEvent import AsyncEvent
from .utils.credential import Credential
from .utils.BytesReader import BytesReader
from .utils.danmaku import Danmaku, SpecialDanmaku
from .utils.network import get_aiohttp_session, Api, get_session
from .exceptions import (
ArgsException,
NetworkException,
ResponseException,
DanmakuClosedException,
)
API = get_api("video")
def get_cid_info_sync(cid: int):
"""
获取 cid 信息 (对应的视频,具体分 P 序号,up 等)
Returns:
dict: 调用 https://hd.biliplus.com 的 API 返回的结果
"""
api = API["info"]["cid_info"]
params = {"cid": cid}
return Api(**api).update_params(**params).result_sync
async def get_cid_info(cid: int):
"""
获取 cid 信息 (对应的视频,具体分 P 序号,up 等)
Returns:
dict: 调用 https://hd.biliplus.com 的 API 返回的结果
"""
api = API["info"]["cid_info"]
params = {"cid": cid}
return await Api(**api).update_params(**params).result
class DanmakuOperatorType(Enum):
"""
弹幕操作枚举
+ DELETE - 删除弹幕
+ PROTECT - 保护弹幕
+ UNPROTECT - 取消保护弹幕
"""
DELETE = 1
PROTECT = 2
UNPROTECT = 3
class VideoAppealReasonType:
"""
视频投诉原因枚举
注意: 每一项均为函数,部分项有参数,没有参数的函数无需调用函数,直接传入即可,有参数的函数请调用结果之后传入。
- ILLEGAL(): 违法违禁
- PRON(): 色情
- VULGAR(): 低俗
- GAMBLED_SCAMS(): 赌博诈骗
- VIOLENT(): 血腥暴力
- PERSONAL_ATTACK(): 人身攻击
- PLAGIARISM(bvid: str): 与站内其他视频撞车
- BAD_FOR_YOUNGS(): 青少年不良信息
- CLICKBAIT(): 不良封面/标题
- POLITICAL_RUMORS(): 涉政谣言
- SOCIAL_RUMORS(): 涉社会事件谣言
- COVID_RUMORS(): 疫情谣言
- UNREAL_EVENT(): 虚假不实消息
- OTHER(): 有其他问题
- LEAD_WAR(): 引战
- CANNOT_CHARGE(): 不能参加充电
- UNREAL_COPYRIGHT(source: str): 转载/自制类型错误
"""
ILLEGAL = lambda: 2
PRON = lambda: 3
VULGAR = lambda: 4
GAMBLED_SCAMS = lambda: 5
VIOLENT = lambda: 6
PERSONAL_ATTACK = lambda: 7
BAD_FOR_YOUNGS = lambda: 10000
CLICKBAIT = lambda: 10013
POLITICAL_RUMORS = lambda: 10014
SOCIAL_RUMORS = lambda: 10015
COVID_RUMORS = lambda: 10016
UNREAL_EVENT = lambda: 10017
OTHER = lambda: 1
LEAD_WAR = lambda: 9
CANNOT_CHARGE = lambda: 10
@staticmethod
def PLAGIARISM(bvid: str):
"""
与站内其他视频撞车
Args:
bvid (str): 撞车对象
"""
return {"tid": 8, "撞车对象": bvid}
@staticmethod
def UNREAL_COPYRIGHT(source: str):
"""
转载/自制类型错误
Args:
source (str): 原创视频出处
"""
return {"tid": 52, "出处": source}
class Video:
"""
视频类,各种对视频的操作均在里面。
"""
def __init__(
self,
bvid: Union[None, str] = None,
aid: Union[None, int] = None,
credential: Union[None, Credential] = None,
):
"""
Args:
bvid (str | None, optional) : BV 号. bvid 和 aid 必须提供其中之一。
aid (int | None, optional) : AV 号. bvid 和 aid 必须提供其中之一。
credential (Credential | None, optional): Credential 类. Defaults to None.
"""
# ID 检查
if bvid is not None:
self.set_bvid(bvid)
elif aid is not None:
self.set_aid(aid)
else:
# 未提供任一 ID
raise ArgsException("请至少提供 bvid 和 aid 中的其中一个参数。")
# 未提供 credential 时初始化该类
self.credential: Credential = Credential() if credential is None else credential
# 用于存储视频信息,避免接口依赖视频信息时重复调用
self.__info: Union[dict, None] = None
def set_bvid(self, bvid: str) -> None:
"""
设置 bvid。
Args:
bvid (str): 要设置的 bvid。
"""
# 检查 bvid 是否有效
if not re.search("^BV[a-zA-Z0-9]{10}$", bvid):
raise ArgsException("bvid 提供错误,必须是以 BV 开头的纯字母和数字组成的 12 位字符串(大小写敏感)。")
self.__bvid = bvid
self.__aid = bvid2aid(bvid)
def get_bvid(self) -> str:
"""
获取 BVID。
Returns:
str: BVID。
"""
return self.__bvid
def set_aid(self, aid: int) -> None:
"""
设置 aid。
Args:
aid (int): AV 号。
"""
if aid <= 0:
raise ArgsException("aid 不能小于或等于 0。")
self.__aid = aid
self.__bvid = aid2bvid(aid)
def get_aid(self) -> int:
"""
获取 AID。
Returns:
int: aid。
"""
return self.__aid
async def get_info(self) -> dict:
"""
获取视频信息。
Returns:
dict: 调用 API 返回的结果。
"""
api = API["info"]["info"]
params = {"bvid": self.get_bvid(), "aid": self.get_aid()}
resp = (
await Api(**api, credential=self.credential).update_params(**params).result
)
# 存入 self.__info 中以备后续调用
self.__info = resp
return resp
async def get_detail(self) -> dict:
"""
获取视频详细信息
Returns:
dict: 调用 API 返回的结果。
"""
api = API["info"]["detail"]
params = {
"bvid": self.get_bvid(),
"aid": self.get_aid(),
"need_operation_card": 0,
"need_elec": 0,
}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def __get_info_cached(self) -> dict:
"""
获取视频信息,如果已获取过则使用之前获取的信息,没有则重新获取。
Returns:
dict: 调用 API 返回的结果。
"""
if self.__info is None:
return await self.get_info()
return self.__info
# get_stat 403/404 https://github.com/SocialSisterYi/bilibili-API-collect/issues/797 等恢复
# async def get_stat(self) -> dict:
# """
# 获取视频统计数据(播放量,点赞数等)。
# Returns:
# dict: 调用 API 返回的结果。
# """
# api = API["info"]["stat"]
# params = {"bvid": self.get_bvid(), "aid": self.get_aid()}
# return await Api(**api, credential=self.credential).update_params(**params).result
async def get_up_mid(self) -> int:
"""
获取视频 up 主的 mid。
Returns:
int: up_mid
"""
info = await self.__get_info_cached()
return info["owner"]["mid"]
async def get_tags(
self, page_index: Union[int, None] = 0, cid: Union[int, None] = None
) -> List[dict]:
"""
获取视频标签。
Args:
page_index (int | None): 分 P 序号. Defaults to 0.
cid (int | None): 分 P 编码. Defaults to None.
Returns:
List[dict]: 调用 API 返回的结果。
"""
if cid == None:
if page_index == None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.get_cid(page_index=page_index)
api = API["info"]["tags"]
params = {"bvid": self.get_bvid(), "aid": self.get_aid(), "cid": cid}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_chargers(self) -> dict:
"""
获取视频充电用户。
Returns:
dict: 调用 API 返回的结果。
"""
info = await self.__get_info_cached()
mid = info["owner"]["mid"]
api = API["info"]["chargers"]
params = {"aid": self.get_aid(), "bvid": self.get_bvid(), "mid": mid}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_pages(self) -> List[dict]:
"""
获取分 P 信息。
Returns:
dict: 调用 API 返回的结果。
"""
api = API["info"]["pages"]
params = {"aid": self.get_aid(), "bvid": self.get_bvid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def __get_cid_by_index(self, page_index: int) -> int:
"""
根据分 p 号获取 cid。
Args:
page_index (int): 分 P 号,从 0 开始。
Returns:
int: cid 分 P 的唯一 ID。
"""
if page_index < 0:
raise ArgsException("分 p 号必须大于或等于 0。")
info = await self.__get_info_cached()
pages = info["pages"]
if len(pages) <= page_index:
raise ArgsException("不存在该分 p。")
page = pages[page_index]
cid = page["cid"]
return cid
async def get_video_snapshot(
self,
cid: Union[int, None] = None,
json_index: bool = False,
pvideo: bool = True,
) -> dict:
"""
获取视频快照(视频各个时间段的截图拼图)
Args:
cid(int): 分 P CID(可选)
json_index(bool): json 数组截取时间表 True 为需要,False 不需要
pvideo(bool): 是否只获取预览
Returns:
dict: 调用 API 返回的结果,数据中 Url 没有 http 头
"""
params: dict[str, Any] = {"aid": self.get_aid()}
if pvideo:
api = API["info"]["video_snapshot_pvideo"]
else:
params["bvid"] = self.get_bvid()
if json_index:
params["index"] = 1
if cid:
params["cid"] = cid
api = API["info"]["video_snapshot"]
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_cid(self, page_index: int) -> int:
"""
获取稿件 cid
Args:
page_index(int): 分 P
Returns:
int: cid
"""
return await self.__get_cid_by_index(page_index)
async def get_download_url(
self,
page_index: Union[int, None] = None,
cid: Union[int, None] = None,
html5: bool = False,
) -> dict:
"""
获取视频下载信息。
返回结果可以传入 `VideoDownloadURLDataDetecter` 进行解析。
page_index 和 cid 至少提供其中一个,其中 cid 优先级最高
Args:
page_index (int | None, optional) : 分 P 号,从 0 开始。Defaults to None
cid (int | None, optional) : 分 P 的 ID。Defaults to None
html5 (bool, optional): 是否以 html5 平台访问,这样子能直接在网页中播放,但是链接少。
Returns:
dict: 调用 API 返回的结果。
"""
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
api = API["info"]["playurl"]
if html5:
params = {
"avid": self.get_aid(),
"cid": cid,
"qn": "127",
"otype": "json",
"fnval": 4048,
"fourk": 1,
"platform": "html5",
"high_quality": "1",
}
else:
params = {
"avid": self.get_aid(),
"cid": cid,
"qn": "127",
"otype": "json",
"fnval": 4048,
"fourk": 1,
}
result = (
await Api(**api, credential=self.credential).update_params(**params).result
)
result.update({"is_html5": True} if html5 else {})
return result
async def get_related(self) -> dict:
"""
获取相关视频信息。
Returns:
dict: 调用 API 返回的结果。
"""
api = API["info"]["related"]
params = {"aid": self.get_aid(), "bvid": self.get_bvid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_relation(self) -> dict:
"""
获取用户与视频的关系
Returns:
dict: 调用 API 返回的结果。
"""
self.credential.raise_for_no_sessdata()
api = API["info"]["relation"]
params = {"aid": self.get_aid(), "bvid": self.get_bvid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def has_liked(self) -> bool:
"""
视频是否点赞过。
Returns:
bool: 视频是否点赞过。
"""
self.credential.raise_for_no_sessdata()
api = API["info"]["has_liked"]
params = {"bvid": self.get_bvid(), "aid": self.get_aid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_pay_coins(self) -> int:
"""
获取视频已投币数量。
Returns:
int: 视频已投币数量。
"""
self.credential.raise_for_no_sessdata()
api = API["info"]["get_pay_coins"]
params = {"bvid": self.get_bvid(), "aid": self.get_aid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)["multiply"]
async def has_favoured(self) -> bool:
"""
是否已收藏。
Returns:
bool: 视频是否已收藏。
"""
self.credential.raise_for_no_sessdata()
api = API["info"]["has_favoured"]
params = {"bvid": self.get_bvid(), "aid": self.get_aid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)["favoured"]
async def is_forbid_note(self) -> bool:
"""
是否禁止笔记。
Returns:
bool: 是否禁止笔记。
"""
api = API["info"]["is_forbid"]
params = {"aid": self.get_aid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)["forbid_note_entrance"]
async def get_private_notes_list(self) -> list:
"""
获取稿件私有笔记列表。
Returns:
list: note_Ids。
"""
self.credential.raise_for_no_sessdata()
api = API["info"]["private_notes"]
params = {"oid": self.get_aid(), "oid_type": 0}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)["noteIds"]
async def get_public_notes_list(self, pn: int, ps: int) -> dict:
"""
获取稿件公开笔记列表。
Args:
pn (int): 页码
ps (int): 每页项数
Returns:
dict: 调用 API 返回的结果。
"""
api = API["info"]["public_notes"]
params = {"oid": self.get_aid(), "oid_type": 0, "pn": pn, "ps": ps}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_ai_conclusion(self, cid: Optional[int] = None, page_index: Optional[int] = None,
up_mid: Optional[int] = None) -> dict:
"""
获取稿件 AI 总结结果。
cid 和 page_index 至少提供其中一个,其中 cid 优先级最高
Args:
cid (Optional, int): 分 P 的 cid。
page_index (Optional, int): 分 P 号,从 0 开始。
up_mid (Optional, int): up 主的 mid。
Returns:
dict: 调用 API 返回的结果。
"""
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
api = API["info"]["ai_conclusion"]
params = {"aid": self.get_aid(), "bvid": self.get_bvid(), "cid": cid,
"up_mid": await self.get_up_mid() if up_mid is None else up_mid}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def get_danmaku_view(
self, page_index: Union[int, None] = None, cid: Union[int, None] = None
) -> dict:
"""
获取弹幕设置、特殊弹幕、弹幕数量、弹幕分段等信息。
Args:
page_index (int, optional): 分 P 号,从 0 开始。Defaults to None
cid (int, optional): 分 P 的 ID。Defaults to None
Returns:
dict: 调用 API 返回的结果。
"""
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
session = get_session()
api = API["danmaku"]["view"]
config = {}
config["url"] = api["url"]
config["params"] = {"type": 1, "oid": cid, "pid": self.get_aid()}
config["cookies"] = self.credential.get_cookies()
config["headers"] = {
"Referer": "https://www.bilibili.com",
"User-Agent": "Mozilla/5.0",
}
try:
resp = await session.get(**config)
except Exception as e:
raise NetworkException(-1, str(e))
resp_data = resp.read()
json_data = {}
reader = BytesReader(resp_data)
# 解析二进制数据流
def read_dm_seg(stream: bytes):
reader_ = BytesReader(stream)
data = {}
while not reader_.has_end():
t = reader_.varint() >> 3
if t == 1:
data["page_size"] = reader_.varint()
elif t == 2:
data["total"] = reader_.varint()
else:
continue
return data
def read_flag(stream: bytes):
reader_ = BytesReader(stream)
data = {}
while not reader_.has_end():
t = reader_.varint() >> 3
if t == 1:
data["rec_flag"] = reader_.varint()
elif t == 2:
data["rec_text"] = reader_.string()
elif t == 3:
data["rec_switch"] = reader_.varint()
else:
continue
return data
def read_command_danmakus(stream: bytes):
reader_ = BytesReader(stream)
data = {}
while not reader_.has_end():
t = reader_.varint() >> 3
if t == 1:
data["id"] = reader_.varint()
elif t == 2:
data["oid"] = reader_.varint()
elif t == 3:
data["mid"] = reader_.varint()
elif t == 4:
data["commend"] = reader_.string()
elif t == 5:
data["content"] = reader_.string()
elif t == 6:
data["progress"] = reader_.varint()
elif t == 7:
data["ctime"] = reader_.string()
elif t == 8:
data["mtime"] = reader_.string()
elif t == 9:
data["extra"] = json.loads(reader_.string())
elif t == 10:
data["id_str"] = reader_.string()
else:
continue
return data
def read_settings(stream: bytes):
reader_ = BytesReader(stream)
data = {}
while not reader_.has_end():
t = reader_.varint() >> 3
if t == 1:
data["dm_switch"] = reader_.bool()
elif t == 2:
data["ai_switch"] = reader_.bool()
elif t == 3:
data["ai_level"] = reader_.varint()
elif t == 4:
data["enable_top"] = reader_.bool()
elif t == 5:
data["enable_scroll"] = reader_.bool()
elif t == 6:
data["enable_bottom"] = reader_.bool()
elif t == 7:
data["enable_color"] = reader_.bool()
elif t == 8:
data["enable_special"] = reader_.bool()
elif t == 9:
data["prevent_shade"] = reader_.bool()
elif t == 10:
data["dmask"] = reader_.bool()
elif t == 11:
data["opacity"] = reader_.float(True)
elif t == 12:
data["dm_area"] = reader_.varint()
elif t == 13:
data["speed_plus"] = reader_.float(True)
elif t == 14:
data["font_size"] = reader_.float(True)
elif t == 15:
data["screen_sync"] = reader_.bool()
elif t == 16:
data["speed_sync"] = reader_.bool()
elif t == 17:
data["font_family"] = reader_.string()
elif t == 18:
data["bold"] = reader_.bool()
elif t == 19:
data["font_border"] = reader_.varint()
elif t == 20:
data["draw_type"] = reader_.string()
else:
continue
return data
def read_image_danmakus(string: bytes):
image_list = []
reader_ = BytesReader(string)
while not reader_.has_end():
type_ = reader_.varint() >> 3
if type_ == 1:
details_dict: dict[Any, Any] = {"texts": []}
img_details = reader_.bytes_string()
reader_details = BytesReader(img_details)
while not reader_details.has_end():
type_details = reader_details.varint() >> 3
if type_details == 1:
details_dict["texts"].append(reader_details.string())
elif type_details == 2:
details_dict["image"] = reader_details.string()
elif type_details == 3:
id_string = reader_details.bytes_string()
id_reader = BytesReader(id_string)
while not id_reader.has_end():
type_id = id_reader.varint() >> 3
if type_id == 2:
details_dict["id"] = id_reader.varint()
else:
raise ResponseException("解析响应数据错误")
image_list.append(details_dict)
else:
raise ResponseException("解析响应数据错误")
return image_list
while not reader.has_end():
type_ = reader.varint() >> 3
if type_ == 1:
json_data["state"] = reader.varint()
elif type_ == 2:
json_data["text"] = reader.string()
elif type_ == 3:
json_data["text_side"] = reader.string()
elif type_ == 4:
json_data["dm_seg"] = read_dm_seg(reader.bytes_string())
elif type_ == 5:
json_data["flag"] = read_flag(reader.bytes_string())
elif type_ == 6:
if "special_dms" not in json_data:
json_data["special_dms"] = []
json_data["special_dms"].append(reader.string())
elif type_ == 7:
json_data["check_box"] = reader.bool()
elif type_ == 8:
json_data["count"] = reader.varint()
elif type_ == 9:
if "command_dms" not in json_data:
json_data["command_dms"] = []
json_data["command_dms"].append(
read_command_danmakus(reader.bytes_string())
)
elif type_ == 10:
json_data["dm_setting"] = read_settings(reader.bytes_string())
elif type_ == 12:
json_data["image_dms"] = read_image_danmakus(reader.bytes_string())
else:
continue
return json_data
async def get_danmakus(
self,
page_index: int = 0,
date: Union[datetime.date, None] = None,
cid: Union[int, None] = None,
from_seg: Union[int, None] = None,
to_seg: Union[int, None] = None,
) -> List[Danmaku]:
"""
获取弹幕。
Args:
page_index (int, optional): 分 P 号,从 0 开始。Defaults to None
date (datetime.Date | None, optional): 指定日期后为获取历史弹幕,精确到年月日。Defaults to None.
cid (int | None, optional): 分 P 的 ID。Defaults to None
from_seg (int, optional): 从第几段开始(0 开始编号,None 为从第一段开始,一段 6 分钟). Defaults to None.
to_seg (int, optional): 到第几段结束(0 开始编号,None 为到最后一段,包含编号的段,一段 6 分钟). Defaults to None.
注意:
- 1. 段数可以使用 `get_danmaku_view()["dm_seg"]["total"]` 查询。
- 2. `from_seg` 和 `to_seg` 仅对 `date == None` 的时候有效果。
- 3. 例:取前 `12` 分钟的弹幕:`from_seg=0, to_seg=1`
Returns:
List[Danmaku]: Danmaku 类的列表。
"""
if date is not None:
self.credential.raise_for_no_sessdata()
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
session = get_session()
aid = self.get_aid()
params: dict[str, Any] = {"oid": cid, "type": 1, "pid": aid}
if date is not None:
# 获取历史弹幕
api = API["danmaku"]["get_history_danmaku"]
params["date"] = date.strftime("%Y-%m-%d")
params["type"] = 1
from_seg = to_seg = 0
else:
api = API["danmaku"]["get_danmaku"]
if from_seg == None:
from_seg = 0
if to_seg == None:
view = await self.get_danmaku_view(cid=cid)
to_seg = view["dm_seg"]["total"] - 1
danmakus = []
for seg in range(from_seg, to_seg + 1):
if date is None:
# 仅当获取当前弹幕时需要该参数
params["segment_index"] = seg + 1
config = {}
config["url"] = api["url"]
config["params"] = params
config["headers"] = {
"Referer": "https://www.bilibili.com",
"User-Agent": "Mozilla/5.0",
}
config["cookies"] = self.credential.get_cookies()
try:
req = await session.get(**config)
except Exception as e:
raise NetworkException(-1, str(e))
if "content-type" not in req.headers.keys():
break
else:
content_type = req.headers["content-type"]
if content_type != "application/octet-stream":
raise ResponseException("返回数据类型错误:")
# 解析二进制流数据
data = req.read()
if data == b"\x10\x01":
# 视频弹幕被关闭
raise DanmakuClosedException()
reader = BytesReader(data)
while not reader.has_end():
type_ = reader.varint() >> 3
if type_ != 1:
if type_ == 4:
reader.bytes_string()
# 什么鬼?我用 protoc 解析出乱码!
elif type_ == 5:
# 大会员专属颜色
reader.varint()
reader.varint()
reader.varint()
reader.bytes_string()
elif type_ == 13:
# ???
continue
else:
raise ResponseException("解析响应数据错误")
dm = Danmaku("")
dm_pack_data = reader.bytes_string()
dm_reader = BytesReader(dm_pack_data)
while not dm_reader.has_end():
data_type = dm_reader.varint() >> 3
if data_type == 1:
dm.id_ = dm_reader.varint()
elif data_type == 2:
dm.dm_time = dm_reader.varint() / 1000
elif data_type == 3:
dm.mode = dm_reader.varint()
elif data_type == 4:
dm.font_size = dm_reader.varint()
elif data_type == 5:
color = dm_reader.varint()
if color != 60001:
color = hex(color)[2:]
else:
color = "special"
dm.color = color
elif data_type == 6:
dm.crc32_id = dm_reader.string()
elif data_type == 7:
dm.text = dm_reader.string()
elif data_type == 8:
dm.send_time = dm_reader.varint()
elif data_type == 9:
dm.weight = dm_reader.varint()
elif data_type == 10:
dm.action = str(dm_reader.string())
elif data_type == 11:
dm.pool = dm_reader.varint()
elif data_type == 12:
dm.id_str = dm_reader.string()
elif data_type == 13:
dm.attr = dm_reader.varint()
elif data_type == 14:
dm.uid = dm_reader.varint()
elif data_type == 15:
dm_reader.varint()
elif data_type == 20:
dm_reader.bytes_string()
elif data_type == 21:
dm_reader.bytes_string()
elif data_type == 22:
dm_reader.bytes_string()
elif data_type == 25:
dm_reader.varint()
elif data_type == 26:
dm_reader.varint()
else:
break
danmakus.append(dm)
return danmakus
async def get_special_dms(
self, page_index: int = 0, cid: Union[int, None] = None
) -> List[SpecialDanmaku]:
"""
获取特殊弹幕
Args:
page_index (int, optional) : 分 P 号. Defaults to 0.
cid (int | None, optional): 分 P id. Defaults to None.
Returns:
List[SpecialDanmaku]: 调用接口解析后的结果
"""
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
view = await self.get_danmaku_view(cid=cid)
special_dms = view["special_dms"][0]
if settings.proxy != "":
sess = httpx.AsyncClient(proxies={"all://": settings.proxy})
else:
sess = httpx.AsyncClient()
dm_content = await sess.get(special_dms, cookies=self.credential.get_cookies())
dm_content.raise_for_status()
reader = BytesReader(dm_content.content)
dms: List[SpecialDanmaku] = []
while not reader.has_end():
spec_dm = SpecialDanmaku("")
type_ = reader.varint() >> 3
if type_ == 1:
reader_ = BytesReader(reader.bytes_string())
while not reader_.has_end():
type__ = reader_.varint() >> 3
if type__ == 1:
spec_dm.id_ = reader_.varint()
elif type__ == 3:
spec_dm.mode = reader_.varint()
elif type__ == 4:
reader_.varint()
elif type__ == 5:
reader_.varint()
elif type__ == 6:
reader_.string()
elif type__ == 7:
spec_dm.content = reader_.string()
elif type__ == 8:
reader_.varint()
elif type__ == 11:
spec_dm.pool = reader_.varint()
elif type__ == 12:
spec_dm.id_str = reader_.string()
else:
continue
else:
continue
dms.append(spec_dm)
return dms
async def get_history_danmaku_index(
self,
page_index: Union[int, None] = None,
date: Union[datetime.date, None] = None,
cid: Union[int, None] = None,
) -> Union[None, List[str]]:
"""
获取特定月份存在历史弹幕的日期。
Args:
page_index (int | None, optional): 分 P 号,从 0 开始。Defaults to None
date (datetime.date | None): 精确到年月. Defaults to None。
cid (int | None, optional): 分 P 的 ID。Defaults to None
Returns:
None | List[str]: 调用 API 返回的结果。不存在时为 None。
"""
if date is None:
raise ArgsException("请提供 date 参数")
self.credential.raise_for_no_sessdata()
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
api = API["danmaku"]["get_history_danmaku_index"]
params = {"oid": cid, "month": date.strftime("%Y-%m"), "type": 1}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def has_liked_danmakus(
self,
page_index: Union[int, None] = None,
ids: Union[List[int], None] = None,
cid: Union[int, None] = None,
) -> dict:
"""
是否已点赞弹幕。
Args:
page_index (int | None, optional): 分 P 号,从 0 开始。Defaults to None
ids (List[int] | None): 要查询的弹幕 ID 列表。
cid (int | None, optional): 分 P 的 ID。Defaults to None
Returns:
dict: 调用 API 返回的结果。
"""
if ids is None or len(ids) == 0:
raise ArgsException("请提供 ids 参数并至少有一个元素")
self.credential.raise_for_no_sessdata()
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
api = API["danmaku"]["has_liked_danmaku"]
params = {"oid": cid, "ids": ",".join(ids)} # type: ignore
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def send_danmaku(
self,
page_index: Union[int, None] = None,
danmaku: Union[Danmaku, None] = None,
cid: Union[int, None] = None,
) -> dict:
"""
发送弹幕。
Args:
page_index (int | None, optional): 分 P 号,从 0 开始。Defaults to None
danmaku (Danmaku | None) : Danmaku 类。
cid (int | None, optional): 分 P 的 ID。Defaults to None
Returns:
dict: 调用 API 返回的结果。
"""
if danmaku is None:
raise ArgsException("请提供 danmaku 参数")
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
api = API["danmaku"]["send_danmaku"]
if danmaku.is_sub:
pool = 1
else:
pool = 0
data = {
"type": 1,
"oid": cid,
"msg": danmaku.text,
"aid": self.get_aid(),
"bvid": self.get_bvid(),
"progress": int(danmaku.dm_time * 1000),
"color": int(danmaku.color, 16),
"fontsize": danmaku.font_size,
"pool": pool,
"mode": danmaku.mode,
"plat": 1,
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def get_danmaku_xml(
self, page_index: Union[int, None] = None, cid: Union[int, None] = None
) -> str:
"""
获取所有弹幕的 xml 源文件(非装填)
Args:
page_index (int, optional) : 分 P 序号. Defaults to 0.
cid (int | None, optional): cid. Defaults to None.
Return:
xml 文件源
"""
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
url = f"https://comment.bilibili.com/{cid}.xml"
sess = get_session()
config: dict[Any, Any] = {"url": url}
# 代理
if settings.proxy:
config["proxies"] = {"all://", settings.proxy}
resp = await sess.get(**config)
return resp.content.decode("utf-8")
async def like_danmaku(
self,
page_index: Union[int, None] = None,
dmid: Union[int, None] = None,
status: Union[bool, None] = True,
cid: Union[int, None] = None,
) -> dict:
"""
点赞弹幕。
Args:
page_index (int | None, optional) : 分 P 号,从 0 开始。Defaults to None
dmid (int | None) : 弹幕 ID。
status (bool | None, optional): 点赞状态。Defaults to True
cid (int | None, optional) : 分 P 的 ID。Defaults to None
Returns:
dict: 调用 API 返回的结果。
"""
if dmid is None:
raise ArgsException("请提供 dmid 参数")
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
api = API["danmaku"]["like_danmaku"]
data = {
"dmid": dmid,
"oid": cid,
"op": 1 if status else 2,
"platform": "web_player",
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def get_online(self, cid: Optional[int] = None, page_index: Optional[int] = 0) -> dict:
"""
获取实时在线人数
Returns:
dict: 调用 API 返回的结果。
"""
api = API["info"]["online"]
params = {"aid": self.get_aid(), "bvid": self.get_bvid(),
"cid": cid if cid is not None else await self.get_cid(page_index=page_index)}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def operate_danmaku(
self,
page_index: Union[int, None] = None,
dmids: Union[List[int], None] = None,
cid: Union[int, None] = None,
type_: Union[DanmakuOperatorType, None] = None,
) -> dict:
"""
操作弹幕
Args:
page_index (int | None, optional) : 分 P 号,从 0 开始。Defaults to None
dmids (List[int] | None) : 弹幕 ID 列表。
cid (int | None, optional) : 分 P 的 ID。Defaults to None
type_ (DanmakuOperatorType | None): 操作类型
Returns:
dict: 调用 API 返回的结果。
"""
if dmids is None or len(dmids) == 0:
raise ArgsException("请提供 dmid 参数")
if type_ is None:
raise ArgsException("请提供 type_ 参数")
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
api = API["danmaku"]["edit_danmaku"]
data = {
"type": 1,
"dmids": ",".join(map(lambda x: str(x), dmids)),
"oid": cid,
"state": type_.value,
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def like(self, status: bool = True) -> dict:
"""
点赞视频。
Args:
status (bool, optional): 点赞状态。Defaults to True.
Returns:
dict: 调用 API 返回的结果。
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["like"]
data = {"aid": self.get_aid(), "like": 1 if status else 2}
return await Api(**api, credential=self.credential).update_data(**data).result
async def pay_coin(self, num: int = 1, like: bool = False) -> dict:
"""
投币。
Args:
num (int, optional) : 硬币数量,为 1 ~ 2 个。Defaults to 1.
like (bool, optional): 是否同时点赞。Defaults to False.
Returns:
dict: 调用 API 返回的结果。
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
if num not in (1, 2):
raise ArgsException("投币数量只能是 1 ~ 2 个。")
api = API["operate"]["coin"]
data = {
"aid": self.get_aid(),
"bvid": self.get_bvid(),
"multiply": num,
"like": 1 if like else 0,
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def share(self) -> int:
"""
分享视频
Returns:
int: 当前分享数
"""
api = API["operate"]["share"]
data = {
"bvid": self.get_bvid(),
"aid": self.get_aid(),
"csrf": self.credential.bili_jct,
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def triple(self) -> dict:
"""
给阿婆主送上一键三连
Returns:
dict: 调用 API 返回的结果
"""
api = API["operate"]["yjsl"]
data = {"bvid": self.get_bvid(), "aid": self.get_aid()}
return await Api(**api, credential=self.credential).update_data(**data).result
async def add_tag(self, name: str) -> dict:
"""
添加标签。
Args:
name (str): 标签名字。
Returns:
dict: 调用 API 返回的结果。会返回标签 ID。
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["add_tag"]
data = {"aid": self.get_aid(), "bvid": self.get_bvid(), "tag_name": name}
return await Api(**api, credential=self.credential).update_data(**data).result
async def delete_tag(self, tag_id: int) -> dict:
"""
删除标签。
Args:
tag_id (int): 标签 ID。
Returns:
dict: 调用 API 返回的结果。
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["del_tag"]
data = {"tag_id": tag_id, "aid": self.get_aid(), "bvid": self.get_bvid()}
return await Api(**api, credential=self.credential).update_data(**data).result
async def appeal(self, reason: Any, detail: str):
"""
投诉稿件
Args:
reason (Any): 投诉类型。传入 VideoAppealReasonType 中的项目即可。
detail (str): 详情信息。
Returns:
dict: 调用 API 返回的结果
"""
api = API["operate"]["appeal"]
data = {"aid": self.get_aid(), "desc": detail}
if isfunction(reason):
reason = reason()
if isinstance(reason, int):
reason = {"tid": reason}
data.update(reason)
# XXX: 暂不支持上传附件
return await Api(**api, credential=self.credential).update_data(**data).result
async def set_favorite(
self, add_media_ids: List[int] = [], del_media_ids: List[int] = []
) -> dict:
"""
设置视频收藏状况。
Args:
add_media_ids (List[int], optional): 要添加到的收藏夹 ID. Defaults to [].
del_media_ids (List[int], optional): 要移出的收藏夹 ID. Defaults to [].
Returns:
dict: 调用 API 返回结果。
"""
if len(add_media_ids) + len(del_media_ids) == 0:
raise ArgsException("对收藏夹无修改。请至少提供 add_media_ids 和 del_media_ids 中的其中一个。")
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["favorite"]
data = {
"rid": self.get_aid(),
"type": 2,
"add_media_ids": ",".join(map(lambda x: str(x), add_media_ids)),
"del_media_ids": ",".join(map(lambda x: str(x), del_media_ids)),
}
return await Api(**api, credential=self.credential).update_data(**data).result
async def get_subtitle(
self,
cid: Union[int, None] = None,
) -> dict:
"""
获取视频上一次播放的记录,字幕和地区信息。需要分集的 cid, 返回数据中含有json字幕的链接
Args:
cid (int | None): 分 P ID,从视频信息中获取
Returns:
调用 API 返回的结果
"""
if cid is None:
raise ArgsException("需要 cid")
return (await self.get_player_info(cid=cid)).get("subtitle")
async def get_player_info(
self,
cid: Union[int, None] = None,
epid: Union[int, None] = None,
) -> dict:
"""
获取字幕信息
Args:
cid (int | None): 分 P ID,从视频信息中获取
epid (int | None): 番剧分集 ID,从番剧信息中获取
Returns:
调用 API 返回的结果
"""
if cid is None:
raise ArgsException("需要 cid")
api = API["info"]["get_player_info"]
params = {
"aid": self.get_aid(),
"cid": cid,
}
if epid:
params["epid"] = epid
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def submit_subtitle(
self,
lan: str,
data: dict,
submit: bool,
sign: bool,
page_index: Union[int, None] = None,
cid: Union[int, None] = None,
) -> dict:
"""
上传字幕
字幕数据 data 参考:
```json
{
"font_size": "float: 字体大小,默认 0.4",
"font_color": "str: 字体颜色,默认 \"#FFFFFF\"",
"background_alpha": "float: 背景不透明度,默认 0.5",
"background_color": "str: 背景颜色,默认 \"#9C27B0\"",
"Stroke": "str: 描边,目前作用未知,默认为 \"none\"",
"body": [
{
"from": "int: 字幕开始时间(秒)",
"to": "int: 字幕结束时间(秒)",
"location": "int: 字幕位置,默认为 2",
"content": "str: 字幕内容"
}
]
}
```
Args:
lan (str) : 字幕语言代码,参考 https://s1.hdslb.com/bfs/subtitle/subtitle_lan.json
data (dict) : 字幕数据
submit (bool) : 是否提交,不提交为草稿
sign (bool) : 是否署名
page_index (int | None, optional): 分 P 索引. Defaults to None.
cid (int | None, optional): 分 P id. Defaults to None.
Returns:
dict: API 调用返回结果
"""
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["submit_subtitle"]
# lan check,应该是这里面的语言代码
with open(
os.path.join(os.path.dirname(__file__), "data/subtitle_lan.json"),
encoding="utf-8",
) as f:
subtitle_lans = json.load(f)
for lan in subtitle_lans:
if lan["lan"] == lan:
break
else:
raise ArgsException("lan 参数错误,请参见 https://s1.hdslb.com/bfs/subtitle/subtitle_lan.json")
payload = {
"type": 1,
"oid": cid,
"lan": lan,
"data": json.dumps(data),
"submit": submit,
"sign": sign,
"bvid": self.get_bvid(),
}
return await Api(**api, credential=self.credential).update_data(**payload).result
async def get_danmaku_snapshot(self) -> dict:
"""
获取弹幕快照
Returns:
调用 API 返回的结果
"""
api = API["danmaku"]["snapshot"]
params = {"aid": self.get_aid()}
return (
await Api(**api, credential=self.credential).update_params(**params).result
)
async def recall_danmaku(
self,
page_index: Union[int, None] = None,
dmid: int = 0,
cid: Union[int, None] = None,
) -> dict:
"""
撤回弹幕
Args:
page_index(int | None, optional): 分 P 号
dmid(int) : 弹幕 id
cid(int | None, optional) : 分 P 编码
Returns:
调用 API 返回的结果
"""
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["danmaku"]["recall"]
data = {"dmid": dmid, "cid": cid}
return await Api(**api, credential=self.credential).update_data(**data).result
async def get_pbp(
self, page_index: Union[int, None] = None, cid: Union[int, None] = None
) -> dict:
"""
获取高能进度条
Args:
page_index(int | None): 分 P 号
cid(int | None) : 分 P 编码
Returns:
调用 API 返回的结果
"""
if cid is None:
if page_index is None:
raise ArgsException("page_index 和 cid 至少提供一个。")
cid = await self.__get_cid_by_index(page_index)
api = API["info"]["pbp"]
params = {"cid": cid}
session = get_session()
return json.loads(
(
await session.get(
api["url"], params=params, cookies=self.credential.get_cookies()
)
).text
)
async def add_to_toview(self) -> dict:
"""
添加视频至稍后再看列表
Returns:
调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = get_api("toview")["operate"]["add"]
datas = {
"aid": self.get_aid(),
}
return await Api(**api, credential=self.credential).update_data(**datas).result
async def delete_from_toview(self) -> dict:
"""
从稍后再看列表删除视频
Returns:
调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = get_api("toview")["operate"]["del"]
datas = {"viewed": "false", "aid": self.get_aid()}
return await Api(**api, credential=self.credential).update_data(**datas).result
class VideoOnlineMonitor(AsyncEvent):
"""
视频在线人数实时监测。
示例代码:
```python
import asyncio
from bilibili_api import video
# 实例化
r = video.VideoOnlineMonitor("BV1Bf4y1Q7QP")
# 装饰器方法注册事件监听器
@r.on("ONLINE")
async def handler(data):
print(data)
# 函数方法注册事件监听器
async def handler2(data):
print(data)
r.add_event_listener("ONLINE", handler2)
asyncio.get_event_loop().run_until_complete(r.connect())
```
Extends: AsyncEvent
Events:
ONLINE: 在线人数更新。 CallbackData: dict。
DANMAKU: 收到实时弹幕。 CallbackData: Danmaku。
DISCONNECTED: 正常断开连接。 CallbackData: None。
ERROR: 发生错误。 CallbackData: aiohttp.ClientWebSocketResponse。
CONNECTED: 成功连接。 CallbackData: None。
"""
class Datapack(Enum):
"""
数据包类型枚举。
+ CLIENT_VERIFY : 客户端发送验证信息。
+ SERVER_VERIFY : 服务端响应验证信息。
+ CLIENT_HEARTBEAT: 客户端发送心跳包。
+ SERVER_HEARTBEAT: 服务端响应心跳包。
+ DANMAKU : 实时弹幕更新。
"""
CLIENT_VERIFY = 0x7
SERVER_VERIFY = 0x8
CLIENT_HEARTBEAT = 0x2
SERVER_HEARTBEAT = 0x3
DANMAKU = 0x3E8
def __init__(
self,
bvid: Union[str, None] = None,
aid: Union[int, None] = None,
page_index: int = 0,
credential: Union[Credential, None] = None,
debug: bool = False,
):
"""
Args:
bvid (str | None, optional) : BVID. Defaults to None.
aid (int | None, optional) : AID. Defaults to None.
page_index (int, optional) : 分 P 序号. Defaults to 0.
credential (Credential | None, optional): Credential 类. Defaults to None.
debug (bool, optional) : 调试模式,将输出更详细信息. Defaults to False.
"""
super().__init__()
self.credential = credential
self.__video = Video(bvid, aid, credential=credential)
# 智能选择在 log 中展示的 ID。
id_showed = None
if bvid is not None:
id_showed = bvid
else:
id_showed = aid
# logger 初始化
self.logger = logging.getLogger(f"VideoOnlineMonitor-{id_showed}")
if not self.logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(
"[" + str(id_showed) + "][%(asctime)s][%(levelname)s] %(message)s"
)
)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO if not debug else logging.DEBUG)
self.__page_index = page_index
self.__tasks = []
async def connect(self):
"""
连接服务器
"""
await self.__main()
async def disconnect(self):
"""
断开服务器
"""
self.logger.info("主动断开连接。")
self.dispatch("DISCONNECTED")
await self.__cancel_all_tasks()
await self.__ws.close()
async def __main(self):
"""
入口。
"""
# 获取分 P id
pages = await self.__video.get_pages()
if self.__page_index >= len(pages):
raise ArgsException("不存在该分 P。")
cid = pages[self.__page_index]["cid"]
# 获取服务器信息
self.logger.debug(f"准备连接:{self.__video.get_bvid()}")
self.logger.debug(f"获取服务器信息中...")
api = API["info"]["video_online_broadcast_servers"]
resp = await Api(**api, credential=self.credential).result
uri = f"wss://{resp['domain']}:{resp['wss_port']}/sub"
self.__heartbeat_interval = resp["heartbeat"]
self.logger.debug(f"服务器信息获取成功,URI:{uri}")
# 连接服务器
self.logger.debug("准备连接服务器...")
session = get_aiohttp_session()
async with session.ws_connect(uri) as ws:
self.__ws = ws
# 发送认证信息
self.logger.debug("服务器连接成功,准备发送认证信息...")
verify_info = {
"room_id": f"video://{self.__video.get_aid()}/{cid}",
"platform": "web",
"accepts": [1000, 1015],
}
verify_info = json.dumps(verify_info, separators=(",", ":"))
await ws.send_bytes(
self.__pack(
VideoOnlineMonitor.Datapack.CLIENT_VERIFY, 1, verify_info.encode()
)
)
# 循环接收消息
async for msg in ws:
if msg.type == aiohttp.WSMsgType.BINARY:
data = self.__unpack(msg.data)
self.logger.debug(f"收到消息:{data}")
await self.__handle_data(data) # type: ignore
elif msg.type == aiohttp.WSMsgType.ERROR:
self.logger.warning("连接被异常断开")
await self.__cancel_all_tasks()
self.dispatch("ERROR", msg)
break
async def __handle_data(self, data: List[dict]):
"""
处理数据。
Args:
data (List[dict]): 收到的数据(已解析好)。
"""
for d in data:
if d["type"] == VideoOnlineMonitor.Datapack.SERVER_VERIFY.value:
# 服务器认证反馈。
if d["data"]["code"] == 0:
# 创建心跳 Task
heartbeat = asyncio.create_task(self.__heartbeat_task())
self.__tasks.append(heartbeat)
self.logger.info("连接服务器并验证成功")
elif d["type"] == VideoOnlineMonitor.Datapack.SERVER_HEARTBEAT.value:
# 心跳包反馈,同时包含在线人数。
self.logger.debug(f'收到服务器心跳包反馈,编号:{d["number"]}')
self.logger.info(f'实时观看人数:{d["data"]["data"]["room"]["online"]}')
self.dispatch("ONLINE", d["data"])
elif d["type"] == VideoOnlineMonitor.Datapack.DANMAKU.value:
# 实时弹幕。
info = d["data"][0].split(",")
text = d["data"][1]
if info[5] == "0":
is_sub = False
else:
is_sub = True
dm = Danmaku(
dm_time=float(info[0]),
send_time=int(info[4]),
crc32_id=info[6],
color=info[3],
mode=info[1],
font_size=info[2],
is_sub=is_sub,
text=text,
)
self.logger.info(f"收到实时弹幕:{dm.text}")
self.dispatch("DANMAKU", dm)
else:
# 未知类型数据包
self.logger.warning("收到未知的数据包类型,无法解析:" + json.dumps(d))
async def __heartbeat_task(self):
"""
心跳 Task。
"""
index = 2
while True:
self.logger.debug(f"发送心跳包,编号:{index}")
await self.__ws.send_bytes(
self.__pack(
VideoOnlineMonitor.Datapack.CLIENT_HEARTBEAT,
index,
b"[object Object]",
)
)
index += 1
await asyncio.sleep(self.__heartbeat_interval)
async def __cancel_all_tasks(self):
"""
取消所有 Task。
"""
for task in self.__tasks:
task.cancel()
@staticmethod
def __pack(data_type: Datapack, number: int, data: bytes):
"""
打包数据。
# 数据包格式:
16B 头部:
| offset(bytes) | length(bytes) | type | description |
| ------------- | ------------- | ---- | ------------------- |
| 0 | 4 | I | 数据包长度 |
| 4 | 4 | I | 固定 0x00120001 |
| 8 | 4 | I | 数据包类型 |
| 12 | 4 | I | 递增数据包编号 |
| 16 | 2 | H | 固定 0x0000 |
之后是有效载荷。
# 数据包类型表:
+ 0x7 客户端发送认证信息
+ 0x8 服务端回应认证结果
+ 0x2 客户端发送心跳包,有效载荷:'[object Object]'
+ 0x3 服务端回应心跳包,会带上在线人数等信息,返回 JSON
+ 0x3e8 实时弹幕更新,返回列表,[0]弹幕信息,[1]弹幕文本
Args:
data_type (VideoOnlineMonitor.DataType): 数据包类型枚举。
Returns:
bytes: 打包好的数据。
"""
packed_data = bytearray()
packed_data += struct.pack(">I", 0x00120001)
packed_data += struct.pack(">I", data_type.value)
packed_data += struct.pack(">I", number)
packed_data += struct.pack(">H", 0)
packed_data += data
packed_data = struct.pack(">I", len(packed_data) + 4) + packed_data
return bytes(packed_data)
@staticmethod
def __unpack(data: bytes):
"""
解包数据。
Args:
data (bytes): 原始数据。
Returns:
tuple(dict): 解包后的数据。
"""
offset = 0
real_data = []
while offset < len(data):
region_header = struct.unpack(">IIII", data[:16])
region_data = data[offset: offset + region_header[0]]
real_data.append(
{
"type": region_header[2],
"number": region_header[3],
"data": json.loads(
region_data[offset + 18: offset + 18 + (region_header[0] - 16)]
),
}
)
offset += region_header[0]
return tuple(real_data)
class VideoQuality(Enum):
"""
视频的视频流分辨率枚举
- _360P: 流畅 360P
- _480P: 清晰 480P
- _720P: 高清 720P60
- _1080P: 高清 1080P
- _1080P_PLUS: 高清 1080P 高码率
- _1080P_60: 高清 1080P 60 帧码率
- _4K: 超清 4K
- HDR: 真彩 HDR
- DOLBY: 杜比视界
- _8K: 超高清 8K
"""
_360P = 16
_480P = 32
_720P = 64
_1080P = 80
_1080P_PLUS = 112
_1080P_60 = 116
_4K = 120
HDR = 125
DOLBY = 126
_8K = 127
class VideoCodecs(Enum):
"""
视频的视频流编码枚举
- HEV: HEVC(H.265)
- AVC: AVC(H.264)
- AV1: AV1
"""
HEV = "hev"
AVC = "avc"
AV1 = "av01"
class AudioQuality(Enum):
"""
视频的音频流清晰度枚举
- _64K: 64K
- _132K: 132K
- _192K: 192K
- HI_RES: Hi-Res 无损
- DOLBY: 杜比全景声
"""
_64K = 30216
_132K = 30232
DOLBY = 30250
HI_RES = 30251
_192K = 30280
@dataclass
class VideoStreamDownloadURL:
"""
(@dataclass)
视频流 URL 类
Attributes:
url (str) : 视频流 url
video_quality (VideoQuality): 视频流清晰度
video_codecs (VideoCodecs) : 视频流编码
"""
url: str
video_quality: VideoQuality
video_codecs: VideoCodecs
@dataclass
class AudioStreamDownloadURL:
"""
(@dataclass)
音频流 URL 类
Attributes:
url (str) : 音频流 url
audio_quality (AudioQuality): 音频流清晰度
"""
url: str
audio_quality: AudioQuality
@dataclass
class FLVStreamDownloadURL:
"""
(@dataclass)
FLV 视频流
Attributes:
url (str): FLV 流 url
"""
url: str
@dataclass
class HTML5MP4DownloadURL:
"""
(@dataclass)
可供 HTML5 播放的 mp4 视频流
Attributes:
url (str): HTML5 mp4 视频流
"""
url: str
@dataclass
class EpisodeTryMP4DownloadURL:
"""
(@dataclass)
番剧/课程试看的 mp4 播放流
Attributes:
url (str): 番剧试看的 mp4 播放流
"""
url: str
class VideoDownloadURLDataDetecter:
"""
`Video.get_download_url` 返回结果解析类。
在调用 `Video.get_download_url` 之后可以将代入 `VideoDownloadURLDataDetecter`,此类将一键解析。
目前支持:
- 视频清晰度: 360P, 480P, 720P, 1080P, 1080P 高码率, 1080P 60 帧, 4K, HDR, 杜比视界, 8K
- 视频编码: HEVC(H.265), AVC(H.264), AV1
- 音频清晰度: 64K, 132K, Hi-Res 无损音效, 杜比全景声, 192K
- FLV 视频流
- 番剧/课程试看视频流
"""
def __init__(self, data: dict):
"""
Args:
data (dict): `Video.get_download_url` 返回的结果
"""
self.__data = data
def check_video_and_audio_stream(self) -> bool:
"""
判断是否为音视频分离流
Returns:
bool: 是否为音视频分离流
"""
if "dash" in self.__data.keys():
return True
return False
def check_flv_stream(self) -> bool:
"""
判断是否为 FLV 视频流
Returns:
bool: 是否为 FLV 视频流
"""
if "durl" in self.__data.keys():
if self.__data["format"].startswith("flv"):
return True
return False
def check_html5_mp4_stream(self) -> bool:
"""
判断是否为 HTML5 可播放的 mp4 视频流
Returns:
bool: 是否为 HTML5 可播放的 mp4 视频流
"""
if "durl" in self.__data.keys():
if self.__data["format"].startswith("mp4"):
if self.__data.get("is_html5") == True:
return True
return False
def check_episode_try_mp4_stream(self):
"""
判断是否为番剧/课程试看的 mp4 视频流
Returns:
bool: 是否为番剧试看的 mp4 视频流
"""
if "durl" in self.__data.keys():
if self.__data["format"].startswith("mp4"):
if self.__data.get("is_html5") != True:
return True
return False
def detect_all(self):
"""
解析并返回所有数据
Returns:
List[VideoStreamDownloadURL | AudioStreamDownloadURL | FLVStreamDownloadURL | HTML5MP4DownloadURL | EpisodeTryMP4DownloadURL]: 所有的视频/音频流
"""
return self.detect()
def detect(
self,
video_max_quality: VideoQuality = VideoQuality._8K,
audio_max_quality: AudioQuality = AudioQuality._192K,
video_min_quality: VideoQuality = VideoQuality._360P,
audio_min_quality: AudioQuality = AudioQuality._64K,
video_accepted_qualities: List[VideoQuality] = [
item
for _, item in VideoQuality.__dict__.items()
if isinstance(item, VideoQuality)
],
audio_accepted_qualities: List[AudioQuality] = [
item
for _, item in AudioQuality.__dict__.items()
if isinstance(item, AudioQuality)
],
codecs: List[VideoCodecs] = [VideoCodecs.AV1, VideoCodecs.AVC, VideoCodecs.HEV],
no_dolby_video: bool = False,
no_dolby_audio: bool = False,
no_hdr: bool = False,
no_hires: bool = False,
) -> List[
Union[
VideoStreamDownloadURL,
AudioStreamDownloadURL,
FLVStreamDownloadURL,
HTML5MP4DownloadURL,
EpisodeTryMP4DownloadURL,
]
]:
"""
解析数据
Args:
**以下参数仅能在音视频流分离的情况下产生作用,flv / mp4 试看流 / html5 mp4 流下以下参数均没有作用**
video_max_quality (VideoQuality, optional) : 设置提取的视频流清晰度最大值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._8K.
audio_max_quality (AudioQuality, optional) : 设置提取的音频流清晰度最大值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._192K.
video_min_quality (VideoQuality, optional) : 设置提取的视频流清晰度最小值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._360P.
audio_min_quality (AudioQuality, optional) : 设置提取的音频流清晰度最小值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._64K.
video_accepted_qualities(List[VideoQuality], optional): 设置允许的所有视频流清晰度. Defaults to ALL.
audio_accepted_qualities(List[AudioQuality], optional): 设置允许的所有音频清晰度. Defaults to ALL.
codecs (List[VideoCodecs], optional) : 设置所有允许提取出来的视频编码. 此项不会忽略 HDR/杜比. Defaults to ALL codecs.
no_dolby_video (bool, optional) : 是否禁止提取杜比视界视频流. Defaults to False.
no_dolby_audio (bool, optional) : 是否禁止提取杜比全景声音频流. Defaults to False.
no_hdr (bool, optional) : 是否禁止提取 HDR 视频流. Defaults to False.
no_hires (bool, optional) : 是否禁止提取 Hi-Res 音频流. Defaults to False.
Returns:
List[VideoStreamDownloadURL | AudioStreamDownloadURL | FLVStreamDownloadURL | HTML5MP4DownloadURL | EpisodeTryMP4DownloadURL]: 提取出来的视频/音频流
"""
if "durl" in self.__data.keys():
if self.__data["format"].startswith("flv"):
# FLV 视频流
return [FLVStreamDownloadURL(url=self.__data["durl"][0]["url"])]
else:
if self.check_html5_mp4_stream():
# HTML5 MP4 视频流
return [HTML5MP4DownloadURL(url=self.__data["durl"][0]["url"])]
else:
# 会员番剧试看 MP4 流
return [EpisodeTryMP4DownloadURL(url=self.__data["durl"][0]["url"])]
else:
# 正常情况
streams = []
videos_data = self.__data["dash"]["video"]
audios_data = self.__data["dash"]["audio"]
flac_data = self.__data["dash"]["flac"]
dolby_data = self.__data["dash"]["dolby"]
for video_data in videos_data:
video_stream_url = video_data["baseUrl"]
video_stream_quality = VideoQuality(video_data["id"])
if video_stream_quality == VideoQuality.HDR and no_hdr:
continue
if video_stream_quality == VideoQuality.DOLBY and no_dolby_video:
continue
if (
video_stream_quality != VideoQuality.DOLBY
and video_stream_quality != VideoQuality.HDR
and video_stream_quality.value > video_max_quality.value
):
continue
if (
video_stream_quality != VideoQuality.DOLBY
and video_stream_quality != VideoQuality.HDR
and video_stream_quality.value < video_min_quality.value
):
continue
if (
video_stream_quality != VideoQuality.DOLBY
and video_stream_quality != VideoQuality.HDR
and (not video_stream_quality in video_accepted_qualities)
):
continue
video_stream_codecs = None
for val in VideoCodecs:
if val.value in video_data["codecs"]:
video_stream_codecs = val
if (not video_stream_codecs in codecs) and (
video_stream_codecs != None
):
continue
video_stream = VideoStreamDownloadURL(
url=video_stream_url,
video_quality=video_stream_quality,
video_codecs=video_stream_codecs, # type: ignore
)
streams.append(video_stream)
if audios_data:
for audio_data in audios_data:
audio_stream_url = audio_data["baseUrl"]
audio_stream_quality = AudioQuality(audio_data["id"])
if audio_stream_quality.value > audio_max_quality.value:
continue
if audio_stream_quality.value < audio_min_quality.value:
continue
if not audio_stream_quality in audio_accepted_qualities:
continue
audio_stream = AudioStreamDownloadURL(
url=audio_stream_url, audio_quality=audio_stream_quality
)
streams.append(audio_stream)
if flac_data and (not no_hires):
if flac_data["audio"]:
flac_stream_url = flac_data["audio"]["baseUrl"]
flac_stream_quality = AudioQuality(flac_data["audio"]["id"])
flac_stream = AudioStreamDownloadURL(
url=flac_stream_url, audio_quality=flac_stream_quality
)
streams.append(flac_stream)
if dolby_data and (not no_dolby_audio):
if dolby_data["audio"]:
dolby_stream_data = dolby_data["audio"][0]
dolby_stream_url = dolby_stream_data["baseUrl"]
dolby_stream_quality = AudioQuality(dolby_stream_data["id"])
dolby_stream = AudioStreamDownloadURL(
url=dolby_stream_url, audio_quality=dolby_stream_quality
)
streams.append(dolby_stream)
return streams
def detect_best_streams(
self,
video_max_quality: VideoQuality = VideoQuality._8K,
audio_max_quality: AudioQuality = AudioQuality._192K,
video_min_quality: VideoQuality = VideoQuality._360P,
audio_min_quality: AudioQuality = AudioQuality._64K,
video_accepted_qualities: List[VideoQuality] = [
item
for _, item in VideoQuality.__dict__.items()
if isinstance(item, VideoQuality)
],
audio_accepted_qualities: List[AudioQuality] = [
item
for _, item in AudioQuality.__dict__.items()
if isinstance(item, AudioQuality)
],
codecs: List[VideoCodecs] = [VideoCodecs.AV1, VideoCodecs.AVC, VideoCodecs.HEV],
no_dolby_video: bool = False,
no_dolby_audio: bool = False,
no_hdr: bool = False,
no_hires: bool = False,
) -> Union[
List[FLVStreamDownloadURL],
List[HTML5MP4DownloadURL],
List[EpisodeTryMP4DownloadURL],
List[Union[VideoStreamDownloadURL, AudioStreamDownloadURL, None]],
]:
"""
提取出分辨率、音质等信息最好的音视频流。
Args:
**以下参数仅能在音视频流分离的情况下产生作用,flv / mp4 试看流 / html5 mp4 流下以下参数均没有作用**
video_max_quality (VideoQuality) : 设置提取的视频流清晰度最大值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._8K.
audio_max_quality (AudioQuality) : 设置提取的音频流清晰度最大值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._192K.
video_min_quality (VideoQuality, optional) : 设置提取的视频流清晰度最小值,设置此参数绝对不会禁止 HDR/杜比. Defaults to VideoQuality._360P.
audio_min_quality (AudioQuality, optional) : 设置提取的音频流清晰度最小值. 设置此参数绝对不会禁止 Hi-Res/杜比. Defaults to AudioQuality._64K.
video_accepted_qualities(List[VideoQuality], optional): 设置允许的所有视频流清晰度. Defaults to ALL.
audio_accepted_qualities(List[AudioQuality], optional): 设置允许的所有音频清晰度. Defaults to ALL.
codecs (List[VideoCodecs]) : 设置所有允许提取出来的视频编码. 在数组中越靠前的编码选择优先级越高. 此项不会忽略 HDR/杜比. Defaults to [VideoCodecs.AV1, VideoCodecs.AVC, VideoCodecs.HEV].
no_dolby_video (bool) : 是否禁止提取杜比视界视频流. Defaults to False.
no_dolby_audio (bool) : 是否禁止提取杜比全景声音频流. Defaults to False.
no_hdr (bool) : 是否禁止提取 HDR 视频流. Defaults to False.
no_hires (bool) : 是否禁止提取 Hi-Res 音频流. Defaults to False.
Returns:
List[VideoStreamDownloadURL | AudioStreamDownloadURL | FLVStreamDownloadURL | HTML5MP4DownloadURL | None]: FLV 视频流 / HTML5 MP4 视频流 / 番剧或课程试看 MP4 视频流返回 `[FLVStreamDownloadURL | HTML5MP4StreamDownloadURL | EpisodeTryMP4DownloadURL]`, 否则为 `[VideoStreamDownloadURL, AudioStreamDownloadURL]`, 如果未匹配上任何合适的流则对应的位置位 `None`
"""
if self.check_flv_stream():
return self.detect_all() # type: ignore
elif self.check_html5_mp4_stream():
return self.detect_all() # type: ignore
elif self.check_episode_try_mp4_stream():
return self.detect_all() # type: ignore
else:
data = self.detect(
video_max_quality=video_max_quality,
audio_max_quality=audio_max_quality,
video_min_quality=video_min_quality,
audio_min_quality=audio_min_quality,
video_accepted_qualities=video_accepted_qualities,
audio_accepted_qualities=audio_accepted_qualities,
codecs=codecs,
)
video_streams = []
audio_streams = []
for stream in data:
if isinstance(stream, VideoStreamDownloadURL):
video_streams.append(stream)
if isinstance(stream, AudioStreamDownloadURL):
audio_streams.append(stream)
def video_stream_cmp(
s1: VideoStreamDownloadURL, s2: VideoStreamDownloadURL
):
# 杜比/HDR 优先
if s1.video_quality == VideoQuality.DOLBY and (not no_dolby_video):
return 1
elif s2.video_quality == VideoQuality.DOLBY and (not no_dolby_video):
return -1
elif s1.video_quality == VideoQuality.HDR and (not no_hdr):
return 1
elif s2.video_quality == VideoQuality.HDR and (not no_hdr):
return -1
if s1.video_quality.value != s2.video_quality.value:
return s1.video_quality.value - s2.video_quality.value
# Detect the high quality stream to the end.
elif s1.video_codecs.value != s2.video_codecs.value:
return codecs.index(s2.video_codecs) - codecs.index(s1.video_codecs)
return -1
def audio_stream_cmp(
s1: AudioStreamDownloadURL, s2: AudioStreamDownloadURL
):
# 杜比/Hi-Res 优先
if s1.audio_quality == AudioQuality.DOLBY and (not no_dolby_audio):
return 1
if s2.audio_quality == AudioQuality.DOLBY and (not no_dolby_audio):
return -1
if s1.audio_quality == AudioQuality.HI_RES and (not no_hires):
return 1
if s2.audio_quality == AudioQuality.HI_RES and (not no_hires):
return -1
return s1.audio_quality.value - s2.audio_quality.value
video_streams.sort(key=cmp_to_key(video_stream_cmp), reverse=True)
audio_streams.sort(key=cmp_to_key(audio_stream_cmp), reverse=True)
if len(video_streams) == 0:
video_streams = [None]
if len(audio_streams) == 0:
audio_streams = [None]
return [video_streams[0], audio_streams[0]]