# -*- coding: utf-8 -*- import asyncio import datetime import hashlib import hmac import json import logging import uuid from typing import * import aiohttp from . import ws_base __all__ = ( 'OpenLiveClient', ) logger = logging.getLogger('blivedm') START_URL = 'https://live-open.biliapi.com/v2/app/start' HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat' END_URL = 'https://live-open.biliapi.com/v2/app/end' class OpenLiveClient(ws_base.WebSocketClientBase): """ 开放平台客户端 文档参考:https://open-live.bilibili.com/document/ :param access_key_id: 在开放平台申请的access_key_id :param access_key_secret: 在开放平台申请的access_key_secret :param app_id: 在开放平台创建的项目ID :param room_owner_auth_code: 主播身份码 :param session: cookie、连接池 :param heartbeat_interval: 发送连接心跳包的间隔时间(秒) :param game_heartbeat_interval: 发送项目心跳包的间隔时间(秒) """ def __init__( self, access_key_id: str, access_key_secret: str, app_id: int, room_owner_auth_code: str, *, session: Optional[aiohttp.ClientSession] = None, heartbeat_interval=30, game_heartbeat_interval=20, ): super().__init__(session, heartbeat_interval) self._access_key_id = access_key_id self._access_key_secret = access_key_secret self._app_id = app_id self._room_owner_auth_code = room_owner_auth_code self._game_heartbeat_interval = game_heartbeat_interval # 在调用init_room后初始化的字段 self._room_owner_uid: Optional[int] = None """主播用户ID""" self._room_owner_open_id: Optional[str] = None """主播Open ID""" self._host_server_url_list: Optional[List[str]] = [] """弹幕服务器URL列表""" self._auth_body: Optional[str] = None """连接弹幕服务器用的认证包内容""" self._game_id: Optional[str] = None """项目场次ID""" # 在运行时初始化的字段 self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None """发项目心跳包定时器的handle""" @property def room_owner_uid(self) -> Optional[int]: """ 主播用户ID,调用init_room后初始化 """ return self._room_owner_uid @property def room_owner_open_id(self) -> Optional[str]: """ 主播Open ID,调用init_room后初始化 """ return self._room_owner_open_id @property def room_owner_auth_code(self): """ 主播身份码 """ return self._room_owner_auth_code @property def app_id(self): """ 在开放平台创建的项目ID """ return self._app_id @property def game_id(self) -> Optional[str]: """ 项目场次ID,调用init_room后初始化 """ return self._game_id async def close(self): """ 释放本客户端的资源,调用后本客户端将不可用 """ if self.is_running: logger.warning('room=%s is calling close(), but client is running', self.room_id) if self._game_heartbeat_timer_handle is not None: self._game_heartbeat_timer_handle.cancel() self._game_heartbeat_timer_handle = None await self._end_game() await super().close() def _request_open_live(self, url, body: dict): body_bytes = json.dumps(body).encode('utf-8') headers = { 'x-bili-accesskeyid': self._access_key_id, 'x-bili-content-md5': hashlib.md5(body_bytes).hexdigest(), 'x-bili-signature-method': 'HMAC-SHA256', 'x-bili-signature-nonce': uuid.uuid4().hex, 'x-bili-signature-version': '1.0', 'x-bili-timestamp': str(int(datetime.datetime.now().timestamp())), } str_to_sign = '\n'.join( f'{key}:{value}' for key, value in headers.items() ) signature = hmac.new( self._access_key_secret.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256 ).hexdigest() headers['Authorization'] = signature headers['Content-Type'] = 'application/json' headers['Accept'] = 'application/json' return self._session.post(url, headers=headers, data=body_bytes) async def init_room(self): """ 开启项目,并初始化连接房间需要的字段 :return: 是否成功 """ if not await self._start_game(): return False if self._game_id != '' and self._game_heartbeat_timer_handle is None: self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later( self._game_heartbeat_interval, self._on_send_game_heartbeat ) return True async def _start_game(self): try: async with self._request_open_live( START_URL, {'code': self._room_owner_auth_code, 'app_id': self._app_id} ) as res: if res.status != 200: logger.warning('_start_game() failed, status=%d, reason=%s', res.status, res.reason) return False data = await res.json() if data['code'] != 0: logger.warning('_start_game() failed, code=%d, message=%s, request_id=%s', data['code'], data['message'], data['request_id']) return False if not self._parse_start_game(data['data']): return False except (aiohttp.ClientConnectionError, asyncio.TimeoutError): logger.exception('_start_game() failed:') return False return True def _parse_start_game(self, data): self._game_id = data['game_info']['game_id'] websocket_info = data['websocket_info'] self._auth_body = websocket_info['auth_body'] self._host_server_url_list = websocket_info['wss_link'] anchor_info = data['anchor_info'] self._room_id = anchor_info['room_id'] self._room_owner_uid = anchor_info['uid'] self._room_owner_open_id = anchor_info['open_id'] return True async def _end_game(self): """ 关闭项目。建议关闭客户端时保证调用到这个函数(close会调用),否则可能短时间内无法重复连接同一个房间 """ if self._game_id in (None, ''): return True try: async with self._request_open_live( END_URL, {'app_id': self._app_id, 'game_id': self._game_id} ) as res: if res.status != 200: logger.warning('room=%d _end_game() failed, status=%d, reason=%s', self._room_id, res.status, res.reason) return False data = await res.json() code = data['code'] if code != 0: if code in (7000, 7003): # 项目已经关闭了也算成功 return True logger.warning('room=%d _end_game() failed, code=%d, message=%s, request_id=%s', self._room_id, code, data['message'], data['request_id']) return False except (aiohttp.ClientConnectionError, asyncio.TimeoutError): logger.exception('room=%d _end_game() failed:', self._room_id) return False return True def _on_send_game_heartbeat(self): """ 定时发送项目心跳包的回调 """ self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later( self._game_heartbeat_interval, self._on_send_game_heartbeat ) asyncio.create_task(self._send_game_heartbeat()) async def _send_game_heartbeat(self): """ 发送项目心跳包 """ if self._game_id in (None, ''): logger.warning('game=%d _send_game_heartbeat() failed, game_id not found', self._game_id) return False try: # 保存一下,防止await之后game_id改变 game_id = self._game_id async with self._request_open_live( HEARTBEAT_URL, {'game_id': game_id} ) as res: if res.status != 200: logger.warning('room=%d _send_game_heartbeat() failed, status=%d, reason=%s', self._room_id, res.status, res.reason) return False data = await res.json() code = data['code'] if code != 0: logger.warning('room=%d _send_game_heartbeat() failed, code=%d, message=%s, request_id=%s', self._room_id, code, data['message'], data['request_id']) if code == 7003 and self._game_id == game_id: # 项目异常关闭,可能是心跳超时,需要重新开启项目 self._need_init_room = True if self._websocket is not None and not self._websocket.closed: await self._websocket.close() return False except (aiohttp.ClientConnectionError, asyncio.TimeoutError): logger.exception('room=%d _send_game_heartbeat() failed:', self._room_id) return False return True async def _on_before_ws_connect(self, retry_count): """ 在每次建立连接之前调用,可以用来初始化房间 """ # 重连次数太多则重新init_room,保险 reinit_period = max(3, len(self._host_server_url_list or ())) if retry_count > 0 and retry_count % reinit_period == 0: self._need_init_room = True await super()._on_before_ws_connect(retry_count) def _get_ws_url(self, retry_count) -> str: """ 返回WebSocket连接的URL,可以在这里做故障转移和负载均衡 """ return self._host_server_url_list[retry_count % len(self._host_server_url_list)] async def _send_auth(self): """ 发送认证包 """ await self._websocket.send_bytes(self._make_packet(self._auth_body, ws_base.Operation.AUTH))