| import base64 |
| import json |
| import secrets |
| import string |
| from datetime import timedelta |
| from enum import IntEnum |
| from ipaddress import IPv4Address |
| from random import randint |
| from typing import Annotated, Any, Optional |
|
|
| from Cryptodome.Cipher import AES |
| from Cryptodome.Util.Padding import pad |
| from fastapi import Query |
|
|
| from hibiapi.api.netease.constants import NeteaseConstants |
| from hibiapi.utils.cache import cache_config |
| from hibiapi.utils.decorators import enum_auto_doc |
| from hibiapi.utils.exceptions import UpstreamAPIException |
| from hibiapi.utils.net import catch_network_error |
| from hibiapi.utils.routing import BaseEndpoint, dont_route |
|
|
|
|
| @enum_auto_doc |
| class SearchType(IntEnum): |
| """搜索内容类型""" |
|
|
| SONG = 1 |
| """单曲""" |
| ALBUM = 10 |
| """专辑""" |
| ARTIST = 100 |
| """歌手""" |
| PLAYLIST = 1000 |
| """歌单""" |
| USER = 1002 |
| """用户""" |
| MV = 1004 |
| """MV""" |
| LYRICS = 1006 |
| """歌词""" |
| DJ = 1009 |
| """主播电台""" |
| VIDEO = 1014 |
| """视频""" |
|
|
|
|
| @enum_auto_doc |
| class BitRateType(IntEnum): |
| """歌曲码率""" |
|
|
| LOW = 64000 |
| MEDIUM = 128000 |
| STANDARD = 198000 |
| HIGH = 320000 |
|
|
|
|
| @enum_auto_doc |
| class MVResolutionType(IntEnum): |
| """MV分辨率""" |
|
|
| QVGA = 240 |
| VGA = 480 |
| HD = 720 |
| FHD = 1080 |
|
|
|
|
| @enum_auto_doc |
| class RecordPeriodType(IntEnum): |
| """听歌记录时段类型""" |
|
|
| WEEKLY = 1 |
| """本周""" |
| ALL = 0 |
| """所有时段""" |
|
|
|
|
| class _EncryptUtil: |
| alphabets = bytearray(ord(char) for char in string.ascii_letters + string.digits) |
|
|
| @staticmethod |
| def _aes(data: bytes, key: bytes) -> bytes: |
| data = pad(data, 16) if len(data) % 16 else data |
| return base64.encodebytes( |
| AES.new( |
| key=key, |
| mode=AES.MODE_CBC, |
| iv=NeteaseConstants.AES_IV, |
| ).encrypt(data) |
| ) |
|
|
| @staticmethod |
| def _rsa(data: bytes): |
| result = pow( |
| base=int(data.hex(), 16), |
| exp=NeteaseConstants.RSA_PUBKEY, |
| mod=NeteaseConstants.RSA_MODULUS, |
| ) |
| return f"{result:0>256x}" |
|
|
| @classmethod |
| def encrypt(cls, data: dict[str, Any]) -> dict[str, str]: |
| secret = bytes(secrets.choice(cls.alphabets) for _ in range(16)) |
| secure_key = cls._rsa(bytes(reversed(secret))) |
| return { |
| "params": cls._aes( |
| data=cls._aes( |
| data=json.dumps(data).encode(), |
| key=NeteaseConstants.AES_KEY, |
| ), |
| key=secret, |
| ).decode("ascii"), |
| "encSecKey": secure_key, |
| } |
|
|
|
|
| class NeteaseEndpoint(BaseEndpoint): |
| def _construct_headers(self): |
| headers = self.client.headers.copy() |
| headers["X-Real-IP"] = str( |
| IPv4Address( |
| randint( |
| int(NeteaseConstants.SOURCE_IP_SEGMENT.network_address), |
| int(NeteaseConstants.SOURCE_IP_SEGMENT.broadcast_address), |
| ) |
| ) |
| ) |
| return headers |
|
|
| @dont_route |
| @catch_network_error |
| async def request( |
| self, endpoint: str, *, params: Optional[dict[str, Any]] = None |
| ) -> dict[str, Any]: |
| params = { |
| **(params or {}), |
| "csrf_token": self.client.cookies.get("__csrf", ""), |
| } |
| response = await self.client.post( |
| self._join( |
| NeteaseConstants.HOST, |
| endpoint=endpoint, |
| params=params, |
| ), |
| headers=self._construct_headers(), |
| data=_EncryptUtil.encrypt(params), |
| ) |
| response.raise_for_status() |
| if not response.text.strip(): |
| raise UpstreamAPIException( |
| f"Upstream API {endpoint=} returns blank content" |
| ) |
| return response.json() |
|
|
| async def search( |
| self, |
| *, |
| s: str, |
| search_type: SearchType = SearchType.SONG, |
| limit: int = 20, |
| offset: int = 0, |
| ): |
| return await self.request( |
| "api/cloudsearch/pc", |
| params={ |
| "s": s, |
| "type": search_type, |
| "limit": limit, |
| "offset": offset, |
| "total": True, |
| }, |
| ) |
|
|
| async def artist(self, *, id: int): |
| return await self.request( |
| "weapi/v1/artist/{artist_id}", |
| params={ |
| "artist_id": id, |
| }, |
| ) |
|
|
| async def album(self, *, id: int): |
| return await self.request( |
| "weapi/v1/album/{album_id}", |
| params={ |
| "album_id": id, |
| }, |
| ) |
|
|
| async def detail( |
| self, |
| *, |
| id: Annotated[list[int], Query()], |
| ): |
| return await self.request( |
| "api/v3/song/detail", |
| params={ |
| "c": json.dumps( |
| [{"id": str(i)} for i in id], |
| ), |
| }, |
| ) |
|
|
| @cache_config(ttl=timedelta(minutes=20)) |
| async def song( |
| self, |
| *, |
| id: Annotated[list[int], Query()], |
| br: BitRateType = BitRateType.STANDARD, |
| ): |
| return await self.request( |
| "weapi/song/enhance/player/url", |
| params={ |
| "ids": [str(i) for i in id], |
| "br": br, |
| }, |
| ) |
|
|
| async def playlist(self, *, id: int): |
| return await self.request( |
| "weapi/v6/playlist/detail", |
| params={ |
| "id": id, |
| "total": True, |
| "offset": 0, |
| "limit": 1000, |
| "n": 1000, |
| }, |
| ) |
|
|
| async def lyric(self, *, id: int): |
| return await self.request( |
| "weapi/song/lyric", |
| params={ |
| "id": id, |
| "os": "pc", |
| "lv": -1, |
| "kv": -1, |
| "tv": -1, |
| }, |
| ) |
|
|
| async def mv(self, *, id: int): |
| return await self.request( |
| "api/v1/mv/detail", |
| params={ |
| "id": id, |
| }, |
| ) |
|
|
| async def mv_url( |
| self, |
| *, |
| id: int, |
| res: MVResolutionType = MVResolutionType.FHD, |
| ): |
| return await self.request( |
| "weapi/song/enhance/play/mv/url", |
| params={ |
| "id": id, |
| "r": res, |
| }, |
| ) |
|
|
| async def comments(self, *, id: int, offset: int = 0, limit: int = 1): |
| return await self.request( |
| "weapi/v1/resource/comments/R_SO_4_{song_id}", |
| params={ |
| "song_id": id, |
| "offset": offset, |
| "total": True, |
| "limit": limit, |
| }, |
| ) |
|
|
| async def record(self, *, id: int, period: RecordPeriodType = RecordPeriodType.ALL): |
| return await self.request( |
| "weapi/v1/play/record", |
| params={ |
| "uid": id, |
| "type": period, |
| }, |
| ) |
|
|
| async def djradio(self, *, id: int): |
| return await self.request( |
| "api/djradio/v2/get", |
| params={ |
| "id": id, |
| }, |
| ) |
|
|
| async def dj(self, *, id: int, offset: int = 0, limit: int = 20, asc: bool = False): |
| |
| return await self.request( |
| "weapi/dj/program/byradio", |
| params={ |
| "radioId": id, |
| "offset": offset, |
| "limit": limit, |
| "asc": asc, |
| }, |
| ) |
|
|
| async def detail_dj(self, *, id: int): |
| return await self.request( |
| "api/dj/program/detail", |
| params={ |
| "id": id, |
| }, |
| ) |
|
|
| async def user(self, *, id: int): |
| return await self.request( |
| "weapi/v1/user/detail/{id}", |
| params={"id": id}, |
| ) |
|
|
| async def user_playlist(self, *, id: int, limit: int = 50, offset: int = 0): |
| return await self.request( |
| "weapi/user/playlist", |
| params={ |
| "uid": id, |
| "limit": limit, |
| "offset": offset, |
| }, |
| ) |
|
|