|
from fastapi import FastAPI, HTTPException, Request |
|
from fastapi.responses import HTMLResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from fastapi.templating import Jinja2Templates |
|
from pydantic import BaseModel |
|
from ytmusicapi import YTMusic |
|
import os |
|
import logging |
|
import requests |
|
from datetime import datetime, timedelta |
|
from collections import defaultdict |
|
import time |
|
import asyncio |
|
import cloudscraper |
|
from urllib.parse import urlparse, parse_qs |
|
from collections import defaultdict |
|
import threading |
|
from typing import Optional, Dict, Any |
|
|
|
app = FastAPI() |
|
|
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
templates = Jinja2Templates(directory="templates") |
|
|
|
ytmusic = YTMusic() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class SearchRequest(BaseModel): |
|
query: str |
|
|
|
class MatchRequest(BaseModel): |
|
url: str |
|
|
|
class TrackDownloadRequest(BaseModel): |
|
track_id: str |
|
quality: str = "128" |
|
|
|
class MatchResponse(BaseModel): |
|
url: str |
|
filename: str |
|
track_id: str |
|
|
|
class ErrorResponse(BaseModel): |
|
detail: str |
|
premium: Optional[str] = None |
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
async def index(request: Request): |
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
@app.post("/search") |
|
async def search(request: SearchRequest): |
|
search_results = ytmusic.search(request.query, filter="songs") |
|
return search_results |
|
|
|
@app.post("/searcht") |
|
async def searcht(request: SearchRequest): |
|
logger.info(f"search query: {request.query}") |
|
search_results = ytmusic.search(request.query, filter="songs") |
|
first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {} |
|
return first_song |
|
|
|
def extract_amazon_track_id(url: str) -> Optional[str]: |
|
""" |
|
Extracts track ID from various Amazon Music URL formats. |
|
""" |
|
if "music.amazon.com" not in url: |
|
return None |
|
|
|
parsed_url = urlparse(url) |
|
query_params = parse_qs(parsed_url.query) |
|
|
|
if "trackAsin" in query_params: |
|
return query_params["trackAsin"][0] |
|
|
|
path_parts = parsed_url.path.split('/') |
|
if "tracks" in path_parts: |
|
try: |
|
track_id_index = path_parts.index("tracks") + 1 |
|
if track_id_index < len(path_parts): |
|
return path_parts[track_id_index] |
|
except (ValueError, IndexError): |
|
pass |
|
|
|
logger.warning(f"Could not extract Amazon track ID from URL: {url}") |
|
return None |
|
|
|
def get_song_link_info(url: str) -> Optional[Dict[str, Any]]: |
|
""" |
|
Fetches track information from the Song.link API. |
|
Uses requests.get() which is a blocking call. |
|
""" |
|
api_base_url = "https://api.song.link/v1-alpha.1/links" |
|
params = {"userCountry": "US"} |
|
|
|
if "music.amazon.com" in url: |
|
track_id = extract_amazon_track_id(url) |
|
if track_id: |
|
params["platform"] = "amazonMusic" |
|
params["id"] = track_id |
|
params["type"] = "song" |
|
else: |
|
params["url"] = url |
|
else: |
|
params["url"] = url |
|
|
|
try: |
|
logger.info(f"Querying Song.link API with params: {params}") |
|
response = requests.get(api_base_url, params=params, timeout=10) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error fetching from Song.link API: {e}") |
|
return None |
|
|
|
def extract_url(links_by_platform: dict, platform: str) -> Optional[str]: |
|
""" |
|
Extracts a specific platform URL from Song.link API response. |
|
""" |
|
if platform in links_by_platform and links_by_platform[platform].get("url"): |
|
return links_by_platform[platform]["url"] |
|
|
|
logger.warning(f"No URL found for platform '{platform}' in links: {links_by_platform.keys()}") |
|
return None |
|
|
|
@app.post("/match", response_model=MatchResponse) |
|
async def match(request: MatchRequest): |
|
""" |
|
Matches a given music track URL to a YouTube Music URL. |
|
""" |
|
track_url = request.url |
|
logger.info(f"Match endpoint: Processing URL: {track_url}") |
|
|
|
track_info = get_song_link_info(track_url) |
|
if not track_info: |
|
logger.error(f"Match endpoint: Could not fetch track info for URL: {track_url}") |
|
raise HTTPException(status_code=404, detail="Could not fetch track info from Song.link API.") |
|
|
|
entity_unique_id = track_info.get("entityUniqueId") |
|
title = None |
|
artist = None |
|
|
|
if entity_unique_id and entity_unique_id in track_info.get("entitiesByUniqueId", {}): |
|
main_entity = track_info["entitiesByUniqueId"][entity_unique_id] |
|
title = main_entity.get("title") |
|
artist = main_entity.get("artistName") |
|
logger.info(f"Match endpoint: Found main entity - Title: '{title}', Artist: '{artist}'") |
|
else: |
|
logger.warning(f"Match endpoint: Could not find main entity details for {track_url} using entityUniqueId: {entity_unique_id}") |
|
|
|
|
|
for entity_id, entity_data in track_info.get("entitiesByUniqueId", {}).items(): |
|
if entity_data.get("title") and entity_data.get("artistName"): |
|
title = entity_data.get("title") |
|
artist = entity_data.get("artistName") |
|
logger.info(f"Match endpoint: Using fallback entity - Title: '{title}', Artist: '{artist}' from entity ID {entity_id}") |
|
break |
|
|
|
if not title or not artist: |
|
logger.error(f"Match endpoint: Could not determine title and artist for URL: {track_url}") |
|
raise HTTPException(status_code=404, detail="Could not determine title and artist from Song.link info.") |
|
|
|
youtube_url = extract_url(track_info.get("linksByPlatform", {}), "youtube") |
|
|
|
if youtube_url: |
|
video_id = None |
|
|
|
if "v=" in youtube_url: |
|
video_id = youtube_url.split("v=")[1].split("&")[0] |
|
elif "youtu.be/" in youtube_url: |
|
video_id = youtube_url.split("youtu.be/")[1].split("?")[0] |
|
|
|
filename = f"{title} - {artist}" if title and artist else "Unknown Track - Unknown Artist" |
|
logger.info(f"Match endpoint: Found direct YouTube URL: {youtube_url}, Video ID: {video_id}") |
|
|
|
return MatchResponse(url=youtube_url, filename=filename, track_id=video_id) |
|
else: |
|
logger.info(f"Match endpoint: No direct YouTube URL. Searching YTMusic with: '{title} - {artist}'") |
|
|
|
search_query = f'{title} {artist}' |
|
search_results = ytmusic.search(search_query, filter="songs") |
|
|
|
if search_results: |
|
first_song = next((song for song in search_results if song.get('videoId')), None) |
|
|
|
if first_song and first_song.get('videoId'): |
|
video_id = first_song["videoId"] |
|
ym_url = f'https://music.youtube.com/watch?v={video_id}' |
|
|
|
|
|
artist_name = artist |
|
if first_song.get('artists') and len(first_song['artists']) > 0: |
|
artist_name = first_song['artists'][0]['name'] |
|
|
|
filename = f"{first_song.get('title', title)} - {artist_name}" |
|
logger.info(f"Match endpoint: Found YTMusic search result - URL: {ym_url}, Video ID: {video_id}") |
|
|
|
return MatchResponse(filename=filename, url=ym_url, track_id=video_id) |
|
else: |
|
logger.error(f"Match endpoint: YTMusic search for '{search_query}' yielded no results with a videoId.") |
|
raise HTTPException(status_code=404, detail="No matching video ID found on YouTube Music after search.") |
|
else: |
|
logger.error(f"Match endpoint: YTMusic search for '{search_query}' yielded no results.") |
|
raise HTTPException(status_code=404, detail="No results found on YouTube Music for the track.") |
|
|
|
class ApiRotator: |
|
def __init__(self, apis): |
|
self.apis = apis |
|
self.last_successful_index = None |
|
|
|
def get_prioritized_apis(self): |
|
if self.last_successful_index is not None: |
|
rotated_apis = ( |
|
[self.apis[self.last_successful_index]] + |
|
self.apis[:self.last_successful_index] + |
|
self.apis[self.last_successful_index+1:] |
|
) |
|
return rotated_apis |
|
return self.apis |
|
|
|
def update_last_successful(self, index): |
|
self.last_successful_index = index |
|
|
|
api_rotator = ApiRotator([ |
|
"https://dwnld.nichind.dev", |
|
"https://yt.edd1e.xyz/", |
|
"http://34.107.254.11" |
|
]) |
|
|
|
async def get_track_download_url(track_id: str, quality: str) -> str: |
|
apis = api_rotator.get_prioritized_apis() |
|
session = cloudscraper.create_scraper() |
|
|
|
headers = { |
|
"Accept": "application/json", |
|
"Content-Type": "application/json", |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
|
} |
|
|
|
for i, api_url in enumerate(apis): |
|
try: |
|
logger.info(f"Attempting to get download URL from: {api_url}") |
|
y_url = f"https://youtu.be/{track_id}" |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
response = await loop.run_in_executor( |
|
None, |
|
lambda: session.post( |
|
api_url, |
|
timeout=20, |
|
json={"url": y_url, "audioFormat": "mp3", "downloadMode": "audio", "audioBitrate": quality}, |
|
headers=headers |
|
) |
|
) |
|
|
|
logger.info(f"Response status: {response.status_code}") |
|
logger.info(f"Response content: {response.content}") |
|
|
|
if response.headers.get('content-type', '').startswith('application/json'): |
|
json_response = response.json() |
|
error_code = json_response.get("error", {}).get("code", "") |
|
|
|
if error_code == "error.api.content.video.unavailable": |
|
logger.warning(f"Video unavailable error from {api_url}") |
|
break |
|
|
|
if "url" in json_response: |
|
api_rotator.update_last_successful(i) |
|
return json_response["url"] |
|
|
|
except Exception as e: |
|
logger.error(f"Failed with {api_url}: {str(e)}") |
|
continue |
|
|
|
logger.error(f"No download URL found") |
|
return "" |
|
|
|
|
|
|
|
class RateLimiter: |
|
def __init__(self, max_requests: int, time_window: timedelta): |
|
self.max_requests = max_requests |
|
self.time_window = time_window |
|
self.requests: Dict[str, list] = defaultdict(list) |
|
|
|
def _cleanup_old_requests(self, user_ip: str) -> None: |
|
"""Remove requests that are outside the time window.""" |
|
current_time = time.time() |
|
self.requests[user_ip] = [ |
|
timestamp for timestamp in self.requests[user_ip] |
|
if current_time - timestamp < self.time_window.total_seconds() |
|
] |
|
|
|
def is_rate_limited(self, user_ip: str) -> bool: |
|
"""Check if the user has exceeded their rate limit.""" |
|
self._cleanup_old_requests(user_ip) |
|
|
|
|
|
current_count = len(self.requests[user_ip]) |
|
|
|
|
|
current_time = time.time() |
|
self.requests[user_ip].append(current_time) |
|
|
|
|
|
return (current_count + 1) > self.max_requests |
|
|
|
def get_current_count(self, user_ip: str) -> int: |
|
"""Get the current request count for an IP.""" |
|
self._cleanup_old_requests(user_ip) |
|
return len(self.requests[user_ip]) |
|
|
|
|
|
|
|
rate_limiter = RateLimiter( |
|
max_requests=6, |
|
time_window=timedelta(days=1) |
|
) |
|
|
|
def get_user_ip(request: Request) -> str: |
|
"""Helper function to get user's IP address.""" |
|
forwarded = request.headers.get("X-Forwarded-For") |
|
if forwarded: |
|
return forwarded.split(",")[0] |
|
return request.client.host |
|
|
|
|
|
class ApiRotator: |
|
def __init__(self, apis): |
|
self.apis = apis |
|
self.last_successful_index = None |
|
|
|
def get_prioritized_apis(self): |
|
if self.last_successful_index is not None: |
|
|
|
rotated_apis = ( |
|
[self.apis[self.last_successful_index]] + |
|
self.apis[:self.last_successful_index] + |
|
self.apis[self.last_successful_index+1:] |
|
) |
|
return rotated_apis |
|
return self.apis |
|
|
|
def update_last_successful(self, index): |
|
self.last_successful_index = index |
|
|
|
@app.post("/track_dl") |
|
async def track_dl(request: TrackDownloadRequest, req: Request): |
|
user_ip = get_user_ip(req) |
|
|
|
if rate_limiter.is_rate_limited(user_ip): |
|
current_count = rate_limiter.get_current_count(user_ip) |
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
|
"url": "https://t.me/chrunoss" |
|
} |
|
) |
|
try: |
|
quality_num = int(request.quality) |
|
if quality_num > 128 or request.quality.upper() == 'FLAC': |
|
raise HTTPException( |
|
status_code=400, |
|
detail={ |
|
"error": "Quality above 128 or FLAC is for Premium users Only.", |
|
"premium": "https://chrunos.com/premium-shortcuts/" |
|
} |
|
) |
|
|
|
dl_url = await get_track_download_url(request.track_id, request.quality) |
|
|
|
if dl_url and "http" in dl_url: |
|
return { |
|
"url": dl_url, |
|
"premium": "https://chrunos.com/premium-shortcuts/" |
|
} |
|
else: |
|
raise HTTPException( |
|
status_code=400, |
|
detail={ |
|
"error": "Failed to Fetch the Track.", |
|
"premium": "https://chrunos.com/premium-shortcuts/" |
|
} |
|
) |
|
|
|
except ValueError: |
|
raise HTTPException( |
|
status_code=400, |
|
detail={ |
|
"error": "Invalid quality value provided. It should be a valid integer or FLAC.", |
|
"premium": "https://chrunos.com/premium-shortcuts/" |
|
} |
|
) |
|
|
|
@app.get("/get_artist") |
|
async def get_artist(id: str): |
|
artist_info = ytmusic.get_artist(id) |
|
return artist_info |
|
|
|
@app.get("/get_album") |
|
async def get_album(id: str): |
|
album_info = ytmusic.get_album(id) |
|
return album_info |
|
|
|
@app.get("/get_song") |
|
async def get_song(id: str): |
|
song_info = ytmusic.get_song(id) |
|
return song_info |
|
|
|
if __name__ == '__main__': |
|
import uvicorn |
|
uvicorn.run(app, host='0.0.0.0', port=7860) |
|
|