#!/usr/bin/env python3 import asyncio import json import os import random import time from contextlib import asynccontextmanager from typing import Dict, List, Optional, Union import httpx import uvicorn from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware import logging logger = logging.getLogger(__name__) load_dotenv() # Shared HTTP client is created in app lifespan for connection reuse _http_client: Optional[httpx.AsyncClient] = None # One lock per credential to avoid global contention during token refreshes _refresh_locks: Dict[str, asyncio.Lock] = {} # Loaded credential set from token.json; each entry will be enriched with access cache _creds: List[dict] = [] @asynccontextmanager async def lifespan(app: FastAPI): global _http_client _http_client = httpx.AsyncClient( http2=True, timeout=httpx.Timeout(connect=3.0, read=12.0, write=8.0, pool=12.0), limits=httpx.Limits( max_keepalive_connections=200, max_connections=300, keepalive_expiry=30.0, ), ) try: yield finally: if _http_client: await _http_client.aclose() API_VERSION = "2.4" app = FastAPI( title="HiFi-RestAPI", version=API_VERSION, description="Tidal Music Proxy", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Config (defaults act as fallback if token file missing) CLIENT_ID = os.getenv("CLIENT_ID", "zU4XHVVkc2tDPo4t") CLIENT_SECRET = os.getenv("CLIENT_SECRET", "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4=") REFRESH_TOKEN: Optional[str] = os.getenv("REFRESH_TOKEN") USER_ID = os.getenv("USER_ID") TOKEN_FILE = os.getenv("TOKEN_FILE", "token.json") COUNTRY_CODE = os.getenv("COUNTRY_CODE", "US") if os.path.exists(TOKEN_FILE): with open(TOKEN_FILE, "r") as tok: token_data = json.load(tok) if isinstance(token_data, dict): token_data = [token_data] for entry in token_data: cred = { "client_id": entry.get("client_ID") or CLIENT_ID, "client_secret": entry.get("client_secret") or CLIENT_SECRET, "refresh_token": entry.get("refresh_token") or REFRESH_TOKEN, "user_id": entry.get("userID") or USER_ID, # Access tokens in file have unknown expiry; force refresh on first use "access_token": None, "expires_at": 0, } if cred["refresh_token"]: _creds.append(cred) # Add env var credential if available and unique (simple check) if REFRESH_TOKEN: env_cred = { "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "refresh_token": REFRESH_TOKEN, "user_id": USER_ID, "access_token": None, "expires_at": 0, } # Avoid adding duplicate if it was already loaded from file with same refresh token if not any(c["refresh_token"] == REFRESH_TOKEN for c in _creds): _creds.append(env_cred) if _creds: CLIENT_ID = _creds[0]["client_id"] CLIENT_SECRET = _creds[0]["client_secret"] REFRESH_TOKEN = _creds[0]["refresh_token"] def _pick_credential() -> dict: if not _creds: raise HTTPException(status_code=500, detail="No Tidal credentials available; populate token.json") return random.choice(_creds) def _lock_for_cred(cred: dict) -> asyncio.Lock: key = f"{cred['client_id']}:{cred['refresh_token']}" lock = _refresh_locks.get(key) if lock is None: lock = asyncio.Lock() _refresh_locks[key] = lock return lock async def get_http_client() -> httpx.AsyncClient: if _http_client is None: # Fallback for contexts where lifespan is not run (e.g., direct calls) return httpx.AsyncClient(http2=True) return _http_client async def refresh_tidal_token(cred: Optional[dict] = None): """Refresh a token for the provided credential set.""" cred = cred or _pick_credential() async with _lock_for_cred(cred): if cred["access_token"] and time.time() < cred["expires_at"]: return cred["access_token"] try: client = await get_http_client() res = await client.post( "https://auth.tidal.com/v1/oauth2/token", data={ "client_id": cred["client_id"], "refresh_token": cred["refresh_token"], "grant_type": "refresh_token", "scope": "r_usr+w_usr+w_sub", }, auth=(cred["client_id"], cred["client_secret"]), ) res.raise_for_status() data = res.json() new_token = data["access_token"] expires_in = data.get("expires_in", 3600) cred["access_token"] = new_token cred["expires_at"] = time.time() + expires_in - 60 return new_token except httpx.HTTPError as e: raise HTTPException(status_code=401, detail=f"Token refresh failed: {str(e)}") async def get_tidal_token(force_refresh: bool = False): return await get_tidal_token_for_cred(force_refresh=force_refresh) async def get_tidal_token_for_cred(force_refresh: bool = False, cred: Optional[dict] = None): """Retrieve an access token for a specific credential; pick random if not provided.""" cred = cred or _pick_credential() if not force_refresh and cred["access_token"] and time.time() < cred["expires_at"]: return cred["access_token"], cred token = await refresh_tidal_token(cred) return token, cred async def make_request(url: str, token: Optional[str] = None, params: Optional[dict] = None, cred: Optional[dict] = None): if token is None: token, cred = await get_tidal_token_for_cred(cred=cred) client = await get_http_client() headers = {"authorization": f"Bearer {token}"} try: resp = await client.get(url, headers=headers, params=params) if resp.status_code == 401: # Token expired, refresh and retry token, cred = await get_tidal_token_for_cred(force_refresh=True, cred=cred) headers = {"authorization": f"Bearer {token}"} resp = await client.get(url, headers=headers, params=params) resp.raise_for_status() return {"version": API_VERSION, "data": resp.json()} except httpx.HTTPStatusError as e: if e.response.status_code == 404: raise HTTPException(status_code=404, detail="Resource not found") else: logger.error( "Upstream API error %s %s %s", e.response.status_code, url, e.response.text, exc_info=e, ) raise HTTPException(status_code=e.response.status_code, detail="Upstream API error") except httpx.RequestError as e: if isinstance(e, httpx.TimeoutException): raise HTTPException(status_code=429, detail="Upstream timeout") raise HTTPException(status_code=503, detail="Connection error to Tidal") async def authed_get_json( url: str, *, params: Optional[dict] = None, token: Optional[str] = None, cred: Optional[dict] = None, ): """Perform an authenticated GET, retrying once on 401. Returns payload with updated token/cred.""" if token is None: token, cred = await get_tidal_token_for_cred(cred=cred) client = await get_http_client() headers = {"authorization": f"Bearer {token}"} try: resp = await client.get(url, headers=headers, params=params) if resp.status_code == 401: token, cred = await get_tidal_token_for_cred(force_refresh=True, cred=cred) headers["authorization"] = f"Bearer {token}" resp = await client.get(url, headers=headers, params=params) resp.raise_for_status() return resp.json(), token, cred except httpx.HTTPStatusError as e: if e.response.status_code == 404: raise HTTPException(status_code=404, detail="Resource not found") if e.response.status_code == 429: raise HTTPException(status_code=429, detail="Upstream rate limited") raise HTTPException(status_code=e.response.status_code, detail="Upstream API error") except httpx.RequestError as e: if isinstance(e, httpx.TimeoutException): raise HTTPException(status_code=429, detail="Upstream timeout") raise HTTPException(status_code=503, detail="Connection error to Tidal") @app.get("/") async def index(): return {"version": API_VERSION, "Repo": "https://github.com/uimaxbai/hifi-api"} @app.get("/info/") async def get_info(id: int): url = f"https://api.tidal.com/v1/tracks/{id}/" return await make_request(url, params={"countryCode": COUNTRY_CODE}) @app.get("/track/") async def get_track(id: int, quality: str = "HI_RES_LOSSLESS"): track_url = f"https://tidal.com/v1/tracks/{id}/playbackinfo" params = { "audioquality": quality, "playbackmode": "STREAM", "assetpresentation": "FULL", } return await make_request(track_url, params=params) @app.get("/recommendations/") async def get_recommendations(id: int): recommendations_url = f"https://tidal.com/v1/tracks/{id}/recommendations" params = {"limit": "20", "countryCode": "US"} return await make_request(recommendations_url, params=params) @app.api_route("/search/", methods=["GET"]) async def search( s: Union[str, None] = Query(default=None), a: Union[str, None] = Query(default=None), al: Union[str, None] = Query(default=None), v: Union[str, None] = Query(default=None), p: Union[str, None] = Query(default=None), ): """Search endpoint supporting track/artist/album/video/playlist queries via distinct params.""" queries = ( (s, "https://api.tidal.com/v1/search/tracks", { "query": s, "limit": 25, "offset": 0, "countryCode": COUNTRY_CODE, }), (a, "https://api.tidal.com/v1/search/top-hits", { "query": a, "limit": 25, "offset": 0, "types": "ARTISTS,TRACKS", "countryCode": COUNTRY_CODE, }), (al, "https://api.tidal.com/v1/search/top-hits", { "query": al, "limit": 25, "offset": 0, "types": "ALBUMS", "countryCode": COUNTRY_CODE, }), (v, "https://api.tidal.com/v1/search/top-hits", { "query": v, "limit": 25, "offset": 0, "types": "VIDEOS", "countryCode": COUNTRY_CODE, }), (p, "https://api.tidal.com/v1/search/top-hits", { "query": p, "limit": 25, "offset": 0, "types": "PLAYLISTS", "countryCode": COUNTRY_CODE, }), ) for value, url, params in queries: if value: return await make_request(url, params=params) raise HTTPException(status_code=400, detail="Provide one of s, a, al, v, or p") @app.get("/album/") async def get_album( id: int = Query(..., description="Album ID"), limit: int = Query(100, ge=1, le=500), offset: int = Query(0, ge=0), ): token, cred = await get_tidal_token_for_cred() album_url = f"https://api.tidal.com/v1/albums/{id}" items_url = f"https://api.tidal.com/v1/albums/{id}/items" async def fetch(url: str, params: Optional[dict] = None): nonlocal token, cred payload, token, cred = await authed_get_json( url, params=params, token=token, cred=cred, ) return payload tasks = [fetch(album_url, {"countryCode": COUNTRY_CODE})] max_chunk = 100 current_offset = offset remaining_limit = limit while remaining_limit > 0: chunk_size = min(remaining_limit, max_chunk) tasks.append( fetch(items_url, {"countryCode": COUNTRY_CODE, "limit": chunk_size, "offset": current_offset}) ) current_offset += chunk_size remaining_limit -= chunk_size results = await asyncio.gather(*tasks) album_data = results[0] items_pages = results[1:] all_items = [] for page in items_pages: page_items = page.get("items", page) all_items.extend(page_items) album_data["items"] = all_items return { "version": API_VERSION, "data": album_data, } @app.get("/mix/") async def get_mix( id: str = Query(..., description="Mix ID") ): """Fetch items from a Tidal mix by its ID.""" token, cred = await get_tidal_token_for_cred() url = "https://api.tidal.com/v1/pages/mix" params = { "mixId": id, "countryCode": COUNTRY_CODE, "deviceType": "BROWSER", } data, _, _ = await authed_get_json( url, params=params, token=token, cred=cred, ) header = {} items = [] rows = data.get("rows", []) for row in rows: modules = row.get("modules", []) for module in modules: if module.get("type") == "MIX_HEADER": header = module.get("mix", {}) elif module.get("type") == "TRACK_LIST": paged_list = module.get("pagedList", {}) items = paged_list.get("items", []) return { "version": API_VERSION, "mix": header, "items": [item.get("item", item) for item in items], } @app.get("/playlist/") async def get_playlist( id: str = Query(..., min_length=1), limit: int = Query(100, ge=1, le=500), offset: int = Query(0, ge=0), ): """Fetch playlist metadata plus items concurrently, using shared client and single token.""" token, cred = await get_tidal_token_for_cred() playlist_url = f"https://api.tidal.com/v1/playlists/{id}" items_url = f"https://api.tidal.com/v1/playlists/{id}/items" async def fetch(url: str, params: Optional[dict] = None): nonlocal token, cred payload, token, cred = await authed_get_json( url, params=params, token=token, cred=cred, ) return payload playlist_data, items_data = await asyncio.gather( fetch(playlist_url, {"countryCode": COUNTRY_CODE}), fetch(items_url, {"countryCode": COUNTRY_CODE, "limit": limit, "offset": offset}), ) return { "version": API_VERSION, "playlist": playlist_data, "items": items_data.get("items", items_data), } def _extract_uuid_from_tidal_url(href: str) -> Optional[str]: """Extract and reconstruct a hyphenated UUID from a Tidal resource URL.""" parts = href.split("/") if href else [] return "-".join(parts[4:9]) if len(parts) >= 9 else None @app.get("/artist/similar/") async def get_similar_artists( id: int = Query(..., description="Artist ID"), cursor: Union[int, str, None] = None ): """Fetch artists similar to another by its ID using V2 API.""" url = f"https://openapi.tidal.com/v2/artists/{id}/relationships/similarArtists" params = { "page[cursor]": cursor, "countryCode": COUNTRY_CODE, "include": "similarArtists,similarArtists.profileArt" } payload, _, _ = await authed_get_json(url, params=params) included = payload.get("included", []) artists_map = {i["id"]: i for i in included if i["type"] == "artists"} artworks_map = {i["id"]: i for i in included if i["type"] == "artworks"} def resolve_artist(entry): aid = entry["id"] inc = artists_map.get(aid, {}) attr = inc.get("attributes", {}) pic_id = None if art_data := inc.get("relationships", {}).get("profileArt", {}).get("data"): if artwork := artworks_map.get(art_data[0].get("id")): if files := artwork.get("attributes", {}).get("files"): pic_id = _extract_uuid_from_tidal_url(files[0].get("href")) return { **attr, "id": int(aid) if aid.isdigit() else aid, "picture": pic_id or attr.get("selectedAlbumCoverFallback"), "url": f"http://www.tidal.com/artist/{aid}", "relationType": "SIMILAR_ARTIST" } return { "version": API_VERSION, "artists": [resolve_artist(e) for e in payload.get("data", [])] } @app.get("/album/similar/") async def get_similar_albums( id: int = Query(..., description="Album ID"), cursor: Union[int, str, None] = None ): """Fetch albums similar to another by its ID using V2 API.""" url = f"https://openapi.tidal.com/v2/albums/{id}/relationships/similarAlbums" params = { "page[cursor]": cursor, "countryCode": COUNTRY_CODE, "include": "similarAlbums,similarAlbums.coverArt,similarAlbums.artists" } payload, _, _ = await authed_get_json(url, params=params) included = payload.get("included", []) albums_map = {i["id"]: i for i in included if i["type"] == "albums"} artworks_map = {i["id"]: i for i in included if i["type"] == "artworks"} artists_map = {i["id"]: i for i in included if i["type"] == "artists"} def resolve_album(entry): aid = entry["id"] inc = albums_map.get(aid, {}) attr = inc.get("attributes", {}) cover_id = None if art_data := inc.get("relationships", {}).get("coverArt", {}).get("data"): if artwork := artworks_map.get(art_data[0].get("id")): if files := artwork.get("attributes", {}).get("files"): cover_id = _extract_uuid_from_tidal_url(files[0].get("href")) artist_list = [] if art_data := inc.get("relationships", {}).get("artists", {}).get("data"): for a_entry in art_data: if a_obj := artists_map.get(a_entry["id"]): a_id = a_obj["id"] artist_list.append({ "id": int(a_id) if a_id.isdigit() else a_id, "name": a_obj["attributes"]["name"] }) return { **attr, "id": int(aid) if aid.isdigit() else aid, "cover": cover_id, "artists": artist_list, "url": f"http://www.tidal.com/album/{aid}" } return { "version": API_VERSION, "albums": [resolve_album(e) for e in payload.get("data", [])] } @app.get("/artist/") async def get_artist( id: Optional[int] = Query(default=None), f: Optional[int] = Query(default=None), skip_tracks: bool = Query(default=False), ): """Artist detail or album+track aggregation. - id: basic artist metadata + cover URLs - f: fetch artist albums page and aggregate tracks across albums (capped concurrency) - skip_tracks: if true, returns only albums without aggregating tracks (when using 'f') """ if id is None and f is None: raise HTTPException(status_code=400, detail="Provide id or f query param") token, cred = await get_tidal_token_for_cred() if id is not None: artist_url = f"https://api.tidal.com/v1/artists/{id}" artist_data, token, cred = await authed_get_json( artist_url, params={"countryCode": COUNTRY_CODE}, token=token, cred=cred, ) picture = artist_data.get("picture") fallback = artist_data.get("selectedAlbumCoverFallback") if not picture and fallback: artist_data["picture"] = fallback picture = fallback cover = None if picture: slug = picture.replace("-", "/") cover = { "id": artist_data.get("id"), "name": artist_data.get("name"), "750": f"https://resources.tidal.com/images/{slug}/750x750.jpg", } return {"version": API_VERSION, "artist": artist_data, "cover": cover} # Fetch albums and singles/EPs directly in parallel albums_url = f"https://api.tidal.com/v1/artists/{f}/albums" common_params = {"countryCode": COUNTRY_CODE, "limit": 100} tasks = [ authed_get_json(albums_url, params=common_params, token=token, cred=cred), authed_get_json(albums_url, params={**common_params, "filter": "EPSANDSINGLES"}, token=token, cred=cred), ] if skip_tracks: tasks.append( authed_get_json( f"https://api.tidal.com/v1/artists/{f}/toptracks", params={"countryCode": COUNTRY_CODE, "limit": 15}, token=token, cred=cred ) ) results = await asyncio.gather(*tasks, return_exceptions=True) unique_releases = [] seen_ids = set() # Process albums (first 2 results) for res in results[:2]: if isinstance(res, tuple) and len(res) > 0: data, token, cred = res # Update tokens from latest responses for item in data.get("items", []): if item.get("id") and item["id"] not in seen_ids: unique_releases.append(item) seen_ids.add(item["id"]) elif isinstance(res, Exception): print(f"Error fetching artist releases: {res}") album_ids: List[int] = [item["id"] for item in unique_releases] page_data = {"items": unique_releases} if skip_tracks: top_tracks = [] if len(results) > 2: res = results[2] if isinstance(res, tuple) and len(res) > 0: data, token, cred = res top_tracks = data.get("items", []) elif isinstance(res, Exception): print(f"Error fetching top tracks: {res}") return {"version": API_VERSION, "albums": page_data, "tracks": top_tracks} if not album_ids: return {"version": API_VERSION, "albums": page_data, "tracks": []} sem = asyncio.Semaphore(6) async def fetch_album_tracks(album_id: int): nonlocal token, cred async with sem: album_data, token, cred = await authed_get_json( "https://api.tidal.com/v1/pages/album", params={ "albumId": album_id, "countryCode": COUNTRY_CODE, "deviceType": "BROWSER", }, token=token, cred=cred, ) rows = album_data.get("rows", []) if len(rows) < 2: return [] modules = rows[1].get("modules", []) if not modules: return [] paged_list = modules[0].get("pagedList", {}) items = paged_list.get("items", []) tracks = [track.get("item", track) for track in items] return tracks results = await asyncio.gather( *(fetch_album_tracks(album_id) for album_id in album_ids), return_exceptions=True, ) tracks: List[dict] = [] for res in results: if isinstance(res, Exception): continue tracks.extend(res) return {"version": API_VERSION, "albums": page_data, "tracks": tracks} @app.get("/cover/") async def get_cover( id: Optional[int] = Query(default=None), q: Optional[str] = Query(default=None), ): """Fetch album cover data for a track id or search query.""" if id is None and q is None: raise HTTPException(status_code=400, detail="Provide id or q query param") token, cred = await get_tidal_token_for_cred() def build_cover_entry(cover_slug: str, name: Optional[str], track_id: Optional[int]): slug = cover_slug.replace("-", "/") return { "id": track_id, "name": name, "1280": f"https://resources.tidal.com/images/{slug}/1280x1280.jpg", "640": f"https://resources.tidal.com/images/{slug}/640x640.jpg", "80": f"https://resources.tidal.com/images/{slug}/80x80.jpg", } if id is not None: track_data, token, cred = await authed_get_json( f"https://api.tidal.com/v1/tracks/{id}/", params={"countryCode": COUNTRY_CODE}, token=token, cred=cred, ) album = track_data.get("album") or {} cover_slug = album.get("cover") if not cover_slug: raise HTTPException(status_code=404, detail="Cover not found") entry = build_cover_entry( cover_slug, album.get("title") or track_data.get("title"), album.get("id") or id, ) return {"version": API_VERSION, "covers": [entry]} search_data, token, cred = await authed_get_json( "https://api.tidal.com/v1/search/tracks", params={"countryCode": COUNTRY_CODE, "query": q, "limit": 10}, token=token, cred=cred, ) items = search_data.get("items", [])[:10] if not items: raise HTTPException(status_code=404, detail="Cover not found") covers = [] for track in items: album = track.get("album") or {} cover_slug = album.get("cover") if not cover_slug: continue covers.append( build_cover_entry( cover_slug, track.get("title"), track.get("id"), ) ) if not covers: raise HTTPException(status_code=404, detail="Cover not found") return {"version": API_VERSION, "covers": covers} @app.get("/lyrics/") async def get_lyrics(id: int): url = f"https://api.tidal.com/v1/tracks/{id}/lyrics" data, token, cred = await authed_get_json( url, params={"countryCode": COUNTRY_CODE, "locale": "en_US", "deviceType": "BROWSER"}, ) if not data: raise HTTPException(status_code=404, detail="Lyrics not found") return {"version": API_VERSION, "lyrics": data} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)