|
from fastapi import FastAPI, HTTPException |
|
from deezspot.deezloader import DeeLogin |
|
import requests |
|
import os |
|
import logging |
|
from typing import Optional |
|
from fastapi.staticfiles import StaticFiles |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
app = FastAPI(title="Deezer API") |
|
|
|
|
|
os.makedirs("downloads", exist_ok=True) |
|
app.mount("/downloads", StaticFiles(directory="downloads"), name="downloads") |
|
|
|
|
|
DEEZER_API_URL = "https://api.deezer.com" |
|
|
|
|
|
ARL_TOKEN = "9850d663715d56830e6cdb4d28d19491d8c9d9a8ee31c160a0f5e06116b6d8035fb01c5323ec9690e49a32c0580c0a84e484366df2d6a8ac5786d30a95dc660771fbb372735cb2b39d4081bf30284f08319c0f73f6ad34d3d6bcb4449226877c" |
|
dl = DeeLogin(arl=ARL_TOKEN) |
|
|
|
@app.get("/") |
|
def read_root(): |
|
return {"message": "Deezer API Endpoints - Use /track/{track_id}"} |
|
|
|
|
|
@app.get("/track/{track_id}") |
|
def get_track(track_id: str): |
|
try: |
|
response = requests.get(f"{DEEZER_API_URL}/track/{track_id}") |
|
if response.status_code != 200: |
|
raise HTTPException(status_code=404, detail="Track not found") |
|
return response.json() |
|
except Exception as e: |
|
logger.error(f"Error fetching track metadata: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/download/track/{track_id}") |
|
def download_track(track_id: str, quality: str = "MP3_320"): |
|
try: |
|
|
|
track_info = requests.get(f"{DEEZER_API_URL}/track/{track_id}").json() |
|
track_link = track_info.get("link") |
|
if not track_link: |
|
raise HTTPException(status_code=404, detail="Track link not found") |
|
|
|
|
|
track_title = track_info.get("title", "track") |
|
artist_name = track_info.get("artist", {}).get("name", "unknown") |
|
filename = f"{artist_name} - {track_title}.mp3".replace("/", "_") |
|
filepath = os.path.join("downloads", filename) |
|
|
|
|
|
logger.info(f"Downloading track: {filename}") |
|
dl.download_trackdee( |
|
link_track=track_link, |
|
output_dir="downloads", |
|
quality_download=quality, |
|
recursive_quality=False, |
|
recursive_download=False |
|
) |
|
|
|
|
|
if not os.path.exists(filepath): |
|
|
|
logger.warning(f"File not found at expected path: {filepath}") |
|
logger.warning("Searching for downloaded files in the downloads directory...") |
|
downloaded_files = os.listdir("downloads") |
|
logger.warning(f"Files in downloads directory: {downloaded_files}") |
|
|
|
|
|
for file in downloaded_files: |
|
if track_title.lower() in file.lower() and artist_name.lower() in file.lower(): |
|
filepath = os.path.join("downloads", file) |
|
logger.info(f"Found matching file: {filepath}") |
|
break |
|
else: |
|
raise HTTPException(status_code=500, detail="File download failed") |
|
|
|
|
|
download_url = f"/downloads/{os.path.basename(filepath)}" |
|
logger.info(f"Download successful: {download_url}") |
|
return {"download_url": download_url} |
|
except Exception as e: |
|
logger.error(f"Error downloading track: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/search") |
|
def search_tracks(query: str, limit: Optional[int] = 10): |
|
try: |
|
response = requests.get(f"{DEEZER_API_URL}/search", params={"q": query, "limit": limit}) |
|
return response.json() |
|
except Exception as e: |
|
logger.error(f"Error searching tracks: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |