File size: 15,159 Bytes
a7c3b28
 
 
 
 
 
7e58923
a7c3b28
 
 
 
4bc7387
63e3048
a7c3b28
 
 
 
 
6a45bf5
a7c3b28
7e58923
a7c3b28
 
 
f94628b
a7c3b28
 
 
 
 
 
 
 
 
 
 
4a7e269
 
a7c3b28
 
 
 
 
 
 
 
4a7e269
 
a7c3b28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc67add
a7c3b28
 
 
 
fc67add
 
a7c3b28
229ee72
 
a7c3b28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97e71fe
a7c3b28
 
 
 
 
 
 
 
 
 
 
 
 
 
4a7e269
a7c3b28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4a7e269
a7c3b28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4a7e269
a7c3b28
 
 
 
 
 
 
4a7e269
a7c3b28
 
 
97e71fe
a7c3b28
 
97e71fe
a7c3b28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4bc7387
a7c3b28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4a7e269
a7c3b28
 
 
 
2cb5c72
a7c3b28
 
 
2cb5c72
a7c3b28
 
2cb5c72
a7c3b28
 
 
 
 
 
 
97e71fe
a7c3b28
 
 
 
 
 
 
97e71fe
a7c3b28
 
 
 
 
 
 
 
 
 
 
 
 
97e71fe
a7c3b28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2cb5c72
a7c3b28
 
 
 
 
 
 
 
 
a53021c
a7c3b28
 
 
 
 
 
 
 
 
 
 
c524eff
a7c3b28
 
 
 
 
229ee72
a7c3b28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74078ba
a7c3b28
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from ytmusicapi import YTMusic
import os
import logging
import requests
from datetime import datetime, timedelta
from collections import defaultdict
import time
import asyncio
import cloudscraper
from urllib.parse import urlparse, parse_qs
from collections import defaultdict
import threading
from typing import Optional, Dict, Any

app = FastAPI()

# Mount static files and templates
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")

ytmusic = YTMusic()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Pydantic models for request/response validation
class SearchRequest(BaseModel):
    query: str

class MatchRequest(BaseModel):
    url: str

class TrackDownloadRequest(BaseModel):
    track_id: str
    quality: str = "128"

class MatchResponse(BaseModel):
    url: str
    filename: str
    track_id: str

class ErrorResponse(BaseModel):
    detail: str
    premium: Optional[str] = None

@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

@app.post("/search")
async def search(request: SearchRequest):
    search_results = ytmusic.search(request.query, filter="songs")
    return search_results

@app.post("/searcht")
async def searcht(request: SearchRequest):
    logger.info(f"search query: {request.query}")
    search_results = ytmusic.search(request.query, filter="songs")
    first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {}
    return first_song

def extract_amazon_track_id(url: str) -> Optional[str]:
    """
    Extracts track ID from various Amazon Music URL formats.
    """
    if "music.amazon.com" not in url:
        return None

    parsed_url = urlparse(url)
    query_params = parse_qs(parsed_url.query)

    if "trackAsin" in query_params:
        return query_params["trackAsin"][0]

    path_parts = parsed_url.path.split('/')
    if "tracks" in path_parts:
        try:
            track_id_index = path_parts.index("tracks") + 1
            if track_id_index < len(path_parts):
                return path_parts[track_id_index]
        except (ValueError, IndexError):
            pass

    logger.warning(f"Could not extract Amazon track ID from URL: {url}")
    return None

def get_song_link_info(url: str) -> Optional[Dict[str, Any]]:
    """
    Fetches track information from the Song.link API.
    Uses requests.get() which is a blocking call.
    """
    api_base_url = "https://api.song.link/v1-alpha.1/links"
    params = {"userCountry": "US"}

    if "music.amazon.com" in url:
        track_id = extract_amazon_track_id(url)
        if track_id:
            params["platform"] = "amazonMusic"
            params["id"] = track_id
            params["type"] = "song"
        else:
            params["url"] = url
    else:
        params["url"] = url

    try:
        logger.info(f"Querying Song.link API with params: {params}")
        response = requests.get(api_base_url, params=params, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        logger.error(f"Error fetching from Song.link API: {e}")
        return None

def extract_url(links_by_platform: dict, platform: str) -> Optional[str]:
    """
    Extracts a specific platform URL from Song.link API response.
    """
    if platform in links_by_platform and links_by_platform[platform].get("url"):
        return links_by_platform[platform]["url"]
    
    logger.warning(f"No URL found for platform '{platform}' in links: {links_by_platform.keys()}")
    return None

@app.post("/match", response_model=MatchResponse)
async def match(request: MatchRequest):
    """
    Matches a given music track URL to a YouTube Music URL.
    """
    track_url = request.url
    logger.info(f"Match endpoint: Processing URL: {track_url}")

    track_info = get_song_link_info(track_url)
    if not track_info:
        logger.error(f"Match endpoint: Could not fetch track info for URL: {track_url}")
        raise HTTPException(status_code=404, detail="Could not fetch track info from Song.link API.")

    entity_unique_id = track_info.get("entityUniqueId")
    title = None
    artist = None

    if entity_unique_id and entity_unique_id in track_info.get("entitiesByUniqueId", {}):
        main_entity = track_info["entitiesByUniqueId"][entity_unique_id]
        title = main_entity.get("title")
        artist = main_entity.get("artistName")
        logger.info(f"Match endpoint: Found main entity - Title: '{title}', Artist: '{artist}'")
    else:
        logger.warning(f"Match endpoint: Could not find main entity details for {track_url} using entityUniqueId: {entity_unique_id}")
        
        # Fallback logic to find title/artist from other entities
        for entity_id, entity_data in track_info.get("entitiesByUniqueId", {}).items():
            if entity_data.get("title") and entity_data.get("artistName"):
                title = entity_data.get("title")
                artist = entity_data.get("artistName")
                logger.info(f"Match endpoint: Using fallback entity - Title: '{title}', Artist: '{artist}' from entity ID {entity_id}")
                break

    if not title or not artist:
        logger.error(f"Match endpoint: Could not determine title and artist for URL: {track_url}")
        raise HTTPException(status_code=404, detail="Could not determine title and artist from Song.link info.")

    youtube_url = extract_url(track_info.get("linksByPlatform", {}), "youtube")
    
    if youtube_url:
        video_id = None
        
        if "v=" in youtube_url:
            video_id = youtube_url.split("v=")[1].split("&")[0]
        elif "youtu.be/" in youtube_url:
            video_id = youtube_url.split("youtu.be/")[1].split("?")[0]

        filename = f"{title} - {artist}" if title and artist else "Unknown Track - Unknown Artist"
        logger.info(f"Match endpoint: Found direct YouTube URL: {youtube_url}, Video ID: {video_id}")
        
        return MatchResponse(url=youtube_url, filename=filename, track_id=video_id)
    else:
        logger.info(f"Match endpoint: No direct YouTube URL. Searching YTMusic with: '{title} - {artist}'")
        
        search_query = f'{title} {artist}'
        search_results = ytmusic.search(search_query, filter="songs")
        
        if search_results:
            first_song = next((song for song in search_results if song.get('videoId')), None)
            
            if first_song and first_song.get('videoId'):
                video_id = first_song["videoId"]
                ym_url = f'https://music.youtube.com/watch?v={video_id}'
                
                # Get artist name safely
                artist_name = artist
                if first_song.get('artists') and len(first_song['artists']) > 0:
                    artist_name = first_song['artists'][0]['name']
                
                filename = f"{first_song.get('title', title)} - {artist_name}"
                logger.info(f"Match endpoint: Found YTMusic search result - URL: {ym_url}, Video ID: {video_id}")
                
                return MatchResponse(filename=filename, url=ym_url, track_id=video_id)
            else:
                logger.error(f"Match endpoint: YTMusic search for '{search_query}' yielded no results with a videoId.")
                raise HTTPException(status_code=404, detail="No matching video ID found on YouTube Music after search.")
        else:
            logger.error(f"Match endpoint: YTMusic search for '{search_query}' yielded no results.")
            raise HTTPException(status_code=404, detail="No results found on YouTube Music for the track.")

class ApiRotator:
    def __init__(self, apis):
        self.apis = apis
        self.last_successful_index = None

    def get_prioritized_apis(self):
        if self.last_successful_index is not None:
            rotated_apis = (
                [self.apis[self.last_successful_index]] +
                self.apis[:self.last_successful_index] +
                self.apis[self.last_successful_index+1:]
            )
            return rotated_apis
        return self.apis

    def update_last_successful(self, index):
        self.last_successful_index = index

api_rotator = ApiRotator([
    "https://dwnld.nichind.dev",
    "https://yt.edd1e.xyz/",
    "http://34.107.254.11"
])

async def get_track_download_url(track_id: str, quality: str) -> str:
    apis = api_rotator.get_prioritized_apis()
    session = cloudscraper.create_scraper()
    
    headers = {
        "Accept": "application/json",
        "Content-Type": "application/json",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
    }

    for i, api_url in enumerate(apis):
        try:
            logger.info(f"Attempting to get download URL from: {api_url}")
            y_url = f"https://youtu.be/{track_id}"
            
            # Use asyncio to run the blocking request in a thread pool
            loop = asyncio.get_event_loop()
            response = await loop.run_in_executor(
                None,
                lambda: session.post(
                    api_url,
                    timeout=20,
                    json={"url": y_url, "audioFormat": "mp3", "downloadMode": "audio", "audioBitrate": quality},
                    headers=headers
                )
            )

            logger.info(f"Response status: {response.status_code}")
            logger.info(f"Response content: {response.content}")

            if response.headers.get('content-type', '').startswith('application/json'):
                json_response = response.json()
                error_code = json_response.get("error", {}).get("code", "")
                
                if error_code == "error.api.content.video.unavailable":
                    logger.warning(f"Video unavailable error from {api_url}")
                    break
                
                if "url" in json_response:
                    api_rotator.update_last_successful(i)
                    return json_response["url"]

        except Exception as e:
            logger.error(f"Failed with {api_url}: {str(e)}")
            continue

    logger.error(f"No download URL found")
    return ""


# Rate limiting dictionary
class RateLimiter:
    def __init__(self, max_requests: int, time_window: timedelta):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests: Dict[str, list] = defaultdict(list)
    
    def _cleanup_old_requests(self, user_ip: str) -> None:
        """Remove requests that are outside the time window."""
        current_time = time.time()
        self.requests[user_ip] = [
            timestamp for timestamp in self.requests[user_ip]
            if current_time - timestamp < self.time_window.total_seconds()
        ]
    
    def is_rate_limited(self, user_ip: str) -> bool:
        """Check if the user has exceeded their rate limit."""
        self._cleanup_old_requests(user_ip)
        
        # Get current count after cleanup
        current_count = len(self.requests[user_ip])
        
        # Add current request timestamp (incrementing the count)
        current_time = time.time()
        self.requests[user_ip].append(current_time)
        
        # Check if user has exceeded the maximum requests
        return (current_count + 1) > self.max_requests
    
    def get_current_count(self, user_ip: str) -> int:
        """Get the current request count for an IP."""
        self._cleanup_old_requests(user_ip)
        return len(self.requests[user_ip])


# Initialize rate limiter with 100 requests per day
rate_limiter = RateLimiter(
    max_requests=6,
    time_window=timedelta(days=1)
)

def get_user_ip(request: Request) -> str:
    """Helper function to get user's IP address."""
    forwarded = request.headers.get("X-Forwarded-For")
    if forwarded:
        return forwarded.split(",")[0]
    return request.client.host


class ApiRotator:
    def __init__(self, apis):
        self.apis = apis
        self.last_successful_index = None

    def get_prioritized_apis(self):
        if self.last_successful_index is not None:
            # Move the last successful API to the front
            rotated_apis = (
                [self.apis[self.last_successful_index]] + 
                self.apis[:self.last_successful_index] + 
                self.apis[self.last_successful_index+1:]
            )
            return rotated_apis
        return self.apis

    def update_last_successful(self, index):
        self.last_successful_index = index

@app.post("/track_dl")
async def track_dl(request: TrackDownloadRequest, req: Request):
    user_ip = get_user_ip(req)
    
    if rate_limiter.is_rate_limited(user_ip):
        current_count = rate_limiter.get_current_count(user_ip)
        raise HTTPException(
            status_code=429,
            detail={
                "error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.",
                "url": "https://t.me/chrunoss"
            }
        )
    try:
        quality_num = int(request.quality)
        if quality_num > 128 or request.quality.upper() == 'FLAC':
            raise HTTPException(
                status_code=400,
                detail={
                    "error": "Quality above 128 or FLAC is for Premium users Only.",
                    "premium": "https://chrunos.com/premium-shortcuts/"
                }
            )

        dl_url = await get_track_download_url(request.track_id, request.quality)
        
        if dl_url and "http" in dl_url:
            return {
                "url": dl_url,
                "premium": "https://chrunos.com/premium-shortcuts/"
            }
        else:
            raise HTTPException(
                status_code=400,
                detail={
                    "error": "Failed to Fetch the Track.",
                    "premium": "https://chrunos.com/premium-shortcuts/"
                }
            )

    except ValueError:
        raise HTTPException(
            status_code=400,
            detail={
                "error": "Invalid quality value provided. It should be a valid integer or FLAC.",
                "premium": "https://chrunos.com/premium-shortcuts/"
            }
        )

@app.get("/get_artist")
async def get_artist(id: str):
    artist_info = ytmusic.get_artist(id)
    return artist_info

@app.get("/get_album")
async def get_album(id: str):
    album_info = ytmusic.get_album(id)
    return album_info

@app.get("/get_song")
async def get_song(id: str):
    song_info = ytmusic.get_song(id)
    return song_info

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host='0.0.0.0', port=7860)