void
add feature sending chat
edaa5d4
raw
history blame contribute delete
No virus
11.6 kB
# -*- coding: utf-8 -*-
import asyncio
import logging
from typing import *
import aiohttp
import yarl
from . import ws_base
from .. import utils
import time
__all__ = (
'BLiveClient',
)
logger = logging.getLogger('blivedm')
SEND_CHAT_URL = 'https://api.live.bilibili.com/msg/send'
UID_INIT_URL = 'https://api.bilibili.com/x/web-interface/nav'
BUVID_INIT_URL = 'https://data.bilibili.com/v/'
ROOM_INIT_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom'
DANMAKU_SERVER_CONF_URL = 'https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo'
DEFAULT_DANMAKU_SERVER_LIST = [
{'host': 'broadcastlv.chat.bilibili.com', 'port': 2243, 'wss_port': 443, 'ws_port': 2244}
]
class BLiveClient(ws_base.WebSocketClientBase):
"""
web端客户端
:param room_id: URL中的房间ID,可以用短ID
:param uid: B站用户ID,0表示未登录,None表示自动获取
:param session: cookie、连接池
:param heartbeat_interval: 发送心跳包的间隔时间(秒)
"""
def __init__(
self,
room_id: int,
*,
uid: Optional[int] = None,
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval=30,
):
super().__init__(session, heartbeat_interval)
self._tmp_room_id = room_id
"""用来init_room的临时房间ID,可以用短ID"""
self._uid = uid
# 在调用init_room后初始化的字段
self._room_owner_uid: Optional[int] = None
"""主播用户ID"""
self._host_server_list: Optional[List[dict]] = None
"""
弹幕服务器列表
`[{host: "tx-bj4-live-comet-04.chat.bilibili.com", port: 2243, wss_port: 443, ws_port: 2244}, ...]`
"""
self._host_server_token: Optional[str] = None
"""连接弹幕服务器用的token"""
@property
def tmp_room_id(self) -> int:
"""
构造时传进来的room_id参数
"""
return self._tmp_room_id
@property
def room_owner_uid(self) -> Optional[int]:
"""
主播用户ID,调用init_room后初始化
"""
return self._room_owner_uid
@property
def uid(self) -> Optional[int]:
"""
当前登录的用户ID,未登录则为0,调用init_room后初始化
"""
return self._uid
async def init_room(self):
"""
初始化连接房间需要的字段
:return: True代表没有降级,如果需要降级后还可用,重载这个函数返回True
"""
if self._uid is None:
if not await self._init_uid():
logger.warning('room=%d _init_uid() failed', self._tmp_room_id)
self._uid = 0
if self._get_buvid() == '':
if not await self._init_buvid():
logger.warning('room=%d _init_buvid() failed', self._tmp_room_id)
res = True
if not await self._init_room_id_and_owner():
res = False
# 失败了则降级
self._room_id = self._tmp_room_id
self._room_owner_uid = 0
if not await self._init_host_server():
res = False
# 失败了则降级
self._host_server_list = DEFAULT_DANMAKU_SERVER_LIST
self._host_server_token = None
return res
async def _init_uid(self):
cookies = self._session.cookie_jar.filter_cookies(yarl.URL(UID_INIT_URL))
sessdata_cookie = cookies.get('SESSDATA', None)
if sessdata_cookie is None or sessdata_cookie.value == '':
# cookie都没有,不用请求了
self._uid = 0
return True
try:
async with self._session.get(
UID_INIT_URL,
headers={'User-Agent': utils.USER_AGENT},
) as res:
if res.status != 200:
logger.warning('room=%d _init_uid() failed, status=%d, reason=%s', self._tmp_room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
if data['code'] == -101:
# 未登录
self._uid = 0
return True
logger.warning('room=%d _init_uid() failed, message=%s', self._tmp_room_id,
data['message'])
return False
data = data['data']
if not data['isLogin']:
# 未登录
self._uid = 0
else:
self._uid = data['mid']
return True
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_uid() failed:', self._tmp_room_id)
return False
def _get_buvid(self):
cookies = self._session.cookie_jar.filter_cookies(yarl.URL(BUVID_INIT_URL))
buvid_cookie = cookies.get('buvid3', None)
if buvid_cookie is None:
return ''
return buvid_cookie.value
async def _init_buvid(self):
try:
async with self._session.get(
BUVID_INIT_URL,
headers={'User-Agent': utils.USER_AGENT},
) as res:
if res.status != 200:
logger.warning('room=%d _init_buvid() status error, status=%d, reason=%s',
self._tmp_room_id, res.status, res.reason)
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_buvid() exception:', self._tmp_room_id)
return self._get_buvid() != ''
async def _init_room_id_and_owner(self):
try:
async with self._session.get(
ROOM_INIT_URL,
headers={'User-Agent': utils.USER_AGENT},
params={
'room_id': self._tmp_room_id
},
) as res:
if res.status != 200:
logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('room=%d _init_room_id_and_owner() failed, message=%s', self._tmp_room_id,
data['message'])
return False
if not self._parse_room_init(data['data']):
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_room_id_and_owner() failed:', self._tmp_room_id)
return False
return True
def _parse_room_init(self, data):
room_info = data['room_info']
self._room_id = room_info['room_id']
self._room_owner_uid = room_info['uid']
return True
async def _init_host_server(self):
try:
async with self._session.get(
DANMAKU_SERVER_CONF_URL,
headers={'User-Agent': utils.USER_AGENT},
params={
'id': self._room_id,
'type': 0
},
) as res:
if res.status != 200:
logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id,
res.status, res.reason)
return False
data = await res.json()
if data['code'] != 0:
logger.warning('room=%d _init_host_server() failed, message=%s', self._room_id, data['message'])
return False
if not self._parse_danmaku_server_conf(data['data']):
return False
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room=%d _init_host_server() failed:', self._room_id)
return False
return True
def _parse_danmaku_server_conf(self, data):
self._host_server_list = data['host_list']
self._host_server_token = data['token']
if not self._host_server_list:
logger.warning('room=%d _parse_danmaku_server_conf() failed: host_server_list is empty', self._room_id)
return False
return True
async def _on_before_ws_connect(self, retry_count):
"""
在每次建立连接之前调用,可以用来初始化房间
"""
# 重连次数太多则重新init_room,保险
reinit_period = max(3, len(self._host_server_list or ()))
if retry_count > 0 and retry_count % reinit_period == 0:
self._need_init_room = True
await super()._on_before_ws_connect(retry_count)
def _get_ws_url(self, retry_count) -> str:
"""
返回WebSocket连接的URL,可以在这里做故障转移和负载均衡
"""
host_server = self._host_server_list[retry_count % len(self._host_server_list)]
return f"wss://{host_server['host']}:{host_server['wss_port']}/sub"
async def _send_auth(self):
"""
发送认证包
"""
auth_params = {
'uid': self._uid,
'roomid': self._room_id,
'protover': 3,
'platform': 'web',
'type': 2,
'buvid': self._get_buvid(),
}
if self._host_server_token is not None:
auth_params['key'] = self._host_server_token
await self._websocket.send_bytes(self._make_packet(auth_params, ws_base.Operation.AUTH))
async def send_bulletchat(self, bulletchat: ws_base.BulletChat = None):
"""
:param bulletchat:BulletChat对象。
"""
cookies = self._session.cookie_jar.filter_cookies(yarl.URL(SEND_CHAT_URL))
sessdata_cookie = cookies.get('SESSDATA', None)
if sessdata_cookie is None or sessdata_cookie.value == '':
return False
headers = { 'referer': 'https://live.bilibili.com/',
'User-Agent': utils.USER_AGENT}
dict_data = {"bubble": bulletchat.bubble,
"msg": bulletchat.msg,
"color": bulletchat.color,
"mode": bulletchat.mode,
"fontsize": bulletchat.fontsize,
"rnd": str(int(time.time())),
"roomid": self.room_id,
"csrf": cookies['bili_jct'].value,
"csrf_token": cookies['bili_jct'].value}
dict_cookies = {"SESSDATA": cookies['SESSDATA'].value,
"buvid3": cookies['buvid3'].value,
"bili_jct": cookies['bili_jct'].value,}
try:
async with self._session.request(
method="POST",
url=SEND_CHAT_URL,
data=dict_data,
headers=headers,
cookies=dict_cookies,
) as res:
if res.status != 200:
logger.warning('send_bulletchat() failed, status=%d, reason=%s', res.status, res.reason)
return False
# print(res)
return True
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('send_bulletchat() failed:')
return False