Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| import yt_dlp | |
| import os | |
| from urllib.parse import urlparse, parse_qs | |
| import logging | |
| import json | |
| from datetime import datetime | |
| import glob | |
| app = Flask(__name__) | |
| # Logging setup | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Cookie management | |
| COOKIES_DIR = os.environ.get('COOKIES_DIR', 'cookies') | |
| COOKIE_STATE_FILE = os.path.join(COOKIES_DIR, 'cookie_state.json') | |
| # Create cookies directory if not exists | |
| os.makedirs(COOKIES_DIR, exist_ok=True) | |
| class CookieManager: | |
| def __init__(self): | |
| self.load_state() | |
| def load_state(self): | |
| """Load cookie state from file""" | |
| if os.path.exists(COOKIE_STATE_FILE): | |
| try: | |
| with open(COOKIE_STATE_FILE, 'r') as f: | |
| self.state = json.load(f) | |
| except: | |
| self.state = {'current_cookie': None, 'cookie_status': {}} | |
| else: | |
| self.state = {'current_cookie': None, 'cookie_status': {}} | |
| # Auto-select first available cookie if none selected | |
| if not self.state['current_cookie']: | |
| cookies = self.list_cookies() | |
| if cookies: | |
| self.state['current_cookie'] = cookies[0] | |
| self.save_state() | |
| def save_state(self): | |
| """Save cookie state to file""" | |
| with open(COOKIE_STATE_FILE, 'w') as f: | |
| json.dump(self.state, f, indent=2) | |
| def list_cookies(self): | |
| """List all available cookie files""" | |
| cookie_files = glob.glob(os.path.join(COOKIES_DIR, '*.txt')) | |
| return [os.path.basename(f) for f in cookie_files] | |
| def get_current_cookie(self): | |
| """Get current active cookie file path""" | |
| if self.state['current_cookie']: | |
| cookie_path = os.path.join(COOKIES_DIR, self.state['current_cookie']) | |
| if os.path.exists(cookie_path): | |
| return cookie_path | |
| return None | |
| def get_current_cookie_name(self): | |
| """Get current cookie filename""" | |
| return self.state['current_cookie'] | |
| def set_current_cookie(self, cookie_name): | |
| """Set active cookie""" | |
| cookie_path = os.path.join(COOKIES_DIR, cookie_name) | |
| if os.path.exists(cookie_path): | |
| self.state['current_cookie'] = cookie_name | |
| self.save_state() | |
| return True | |
| return False | |
| def rotate_cookie(self): | |
| """Rotate to next available cookie""" | |
| cookies = self.list_cookies() | |
| if len(cookies) <= 1: | |
| return False, "No other cookies available to rotate" | |
| current = self.state['current_cookie'] | |
| try: | |
| current_idx = cookies.index(current) | |
| next_idx = (current_idx + 1) % len(cookies) | |
| next_cookie = cookies[next_idx] | |
| except (ValueError, IndexError): | |
| next_cookie = cookies[0] | |
| self.state['current_cookie'] = next_cookie | |
| self.state['cookie_status'][current] = { | |
| 'status': 'rotated', | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| self.save_state() | |
| return True, next_cookie | |
| def mark_cookie_failed(self, cookie_name, error): | |
| """Mark a cookie as failed""" | |
| self.state['cookie_status'][cookie_name] = { | |
| 'status': 'failed', | |
| 'error': str(error), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| self.save_state() | |
| def mark_cookie_working(self, cookie_name): | |
| """Mark a cookie as working""" | |
| self.state['cookie_status'][cookie_name] = { | |
| 'status': 'working', | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| self.save_state() | |
| def get_cookie_info(self): | |
| """Get info about all cookies""" | |
| cookies = self.list_cookies() | |
| return { | |
| 'current_cookie': self.state['current_cookie'], | |
| 'available_cookies': cookies, | |
| 'total_cookies': len(cookies), | |
| 'cookie_status': self.state['cookie_status'] | |
| } | |
| # Initialize cookie manager | |
| cookie_manager = CookieManager() | |
| def get_video_id(url): | |
| """Extract video ID from YouTube URL""" | |
| parsed_url = urlparse(url) | |
| if parsed_url.hostname in ['www.youtube.com', 'youtube.com']: | |
| if parsed_url.path == '/watch': | |
| return parse_qs(parsed_url.query)['v'][0] | |
| elif parsed_url.path.startswith('/embed/'): | |
| return parsed_url.path.split('/')[2] | |
| elif parsed_url.hostname in ['youtu.be']: | |
| return parsed_url.path[1:] | |
| return None | |
| def get_audio_url(youtube_url, auto_rotate=True): | |
| """Get direct audio stream URL using yt-dlp""" | |
| current_cookie = cookie_manager.get_current_cookie() | |
| current_cookie_name = cookie_manager.get_current_cookie_name() | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| 'cookiefile': current_cookie if current_cookie else None, | |
| 'nocheckcertificate': True, | |
| 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', | |
| } | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(youtube_url, download=False) | |
| # Get the best audio format | |
| audio_url = None | |
| title = info.get('title', 'Unknown') | |
| duration = info.get('duration', 0) | |
| thumbnail = info.get('thumbnail', '') | |
| # Try to get direct audio URL | |
| if 'url' in info: | |
| audio_url = info['url'] | |
| elif 'formats' in info: | |
| # Find best audio format | |
| audio_formats = [f for f in info['formats'] | |
| if f.get('acodec') != 'none' and f.get('vcodec') == 'none'] | |
| if audio_formats: | |
| best_audio = max(audio_formats, | |
| key=lambda f: f.get('abr', 0) or f.get('tbr', 0)) | |
| audio_url = best_audio['url'] | |
| else: | |
| # Fallback to any format with audio | |
| formats_with_audio = [f for f in info['formats'] | |
| if f.get('acodec') != 'none'] | |
| if formats_with_audio: | |
| best_format = max(formats_with_audio, | |
| key=lambda f: f.get('abr', 0) or f.get('tbr', 0)) | |
| audio_url = best_format['url'] | |
| if not audio_url: | |
| raise Exception("Could not find audio stream") | |
| # Mark cookie as working | |
| if current_cookie_name: | |
| cookie_manager.mark_cookie_working(current_cookie_name) | |
| return { | |
| 'success': True, | |
| 'audio_url': audio_url, | |
| 'title': title, | |
| 'duration': duration, | |
| 'thumbnail': thumbnail, | |
| 'video_id': get_video_id(youtube_url), | |
| 'cookie_used': current_cookie_name | |
| } | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| logger.error(f"Error extracting audio: {str(e)}") | |
| # Check if error is related to authentication/cookies | |
| is_auth_error = any(keyword in error_str for keyword in | |
| ['sign in', 'login', 'cookies', 'authentication', 'forbidden', 'bot']) | |
| # Mark current cookie as failed | |
| if current_cookie_name and is_auth_error: | |
| cookie_manager.mark_cookie_failed(current_cookie_name, str(e)) | |
| # Auto-rotate if enabled and auth error detected | |
| if auto_rotate: | |
| success, next_cookie = cookie_manager.rotate_cookie() | |
| if success: | |
| logger.info(f"Auto-rotating to cookie: {next_cookie}") | |
| return get_audio_url(youtube_url, auto_rotate=False) # Try once more | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'cookie_used': current_cookie_name, | |
| 'is_auth_error': is_auth_error | |
| } | |
| def health_check(): | |
| """Health check endpoint""" | |
| cookie_info = cookie_manager.get_cookie_info() | |
| current_cookie = cookie_manager.get_current_cookie() | |
| return jsonify({ | |
| 'status': 'healthy', | |
| 'current_cookie': cookie_info['current_cookie'], | |
| 'current_cookie_exists': current_cookie is not None, | |
| 'total_cookies': cookie_info['total_cookies'], | |
| 'available_cookies': cookie_info['available_cookies'] | |
| }) | |
| def get_audio(): | |
| """Main endpoint to get audio URL""" | |
| try: | |
| # Get URL from query parameter or JSON body | |
| if request.method == 'GET': | |
| youtube_url = request.args.get('url') | |
| auto_rotate = request.args.get('auto_rotate', 'true').lower() == 'true' | |
| else: | |
| data = request.get_json() | |
| youtube_url = data.get('url') if data else None | |
| auto_rotate = data.get('auto_rotate', True) if data else True | |
| if not youtube_url: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'YouTube URL is required. Use ?url=<youtube_url> or POST with {"url": "<youtube_url>"}' | |
| }), 400 | |
| # Validate YouTube URL | |
| if 'youtube.com' not in youtube_url and 'youtu.be' not in youtube_url: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Invalid YouTube URL' | |
| }), 400 | |
| result = get_audio_url(youtube_url, auto_rotate=auto_rotate) | |
| if result['success']: | |
| return jsonify(result), 200 | |
| else: | |
| return jsonify(result), 500 | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {str(e)}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Server error: {str(e)}' | |
| }), 500 | |
| def list_cookies(): | |
| """List all available cookies and their status""" | |
| cookie_info = cookie_manager.get_cookie_info() | |
| return jsonify(cookie_info), 200 | |
| def get_current_cookie(): | |
| """Get current active cookie""" | |
| current = cookie_manager.get_current_cookie_name() | |
| return jsonify({ | |
| 'current_cookie': current, | |
| 'cookie_exists': cookie_manager.get_current_cookie() is not None | |
| }), 200 | |
| def switch_cookie(): | |
| """Switch to a specific cookie""" | |
| data = request.get_json() | |
| cookie_name = data.get('cookie_name') | |
| if not cookie_name: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'cookie_name is required' | |
| }), 400 | |
| success = cookie_manager.set_current_cookie(cookie_name) | |
| if success: | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'Switched to cookie: {cookie_name}', | |
| 'current_cookie': cookie_name | |
| }), 200 | |
| else: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Cookie file not found: {cookie_name}' | |
| }), 404 | |
| def rotate_cookie(): | |
| """Rotate to next available cookie""" | |
| success, result = cookie_manager.rotate_cookie() | |
| if success: | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'Rotated to cookie: {result}', | |
| 'current_cookie': result | |
| }), 200 | |
| else: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': result | |
| }), 400 | |
| def upload_cookie(): | |
| """Upload a new cookie file""" | |
| if 'file' not in request.files: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No file provided' | |
| }), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No file selected' | |
| }), 400 | |
| # Ensure .txt extension | |
| if not file.filename.endswith('.txt'): | |
| file.filename += '.txt' | |
| try: | |
| # Save file | |
| file_path = os.path.join(COOKIES_DIR, file.filename) | |
| file.save(file_path) | |
| # If this is the first cookie, set it as current | |
| if not cookie_manager.get_current_cookie(): | |
| cookie_manager.set_current_cookie(file.filename) | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'Cookie uploaded successfully: {file.filename}', | |
| 'filename': file.filename, | |
| 'total_cookies': len(cookie_manager.list_cookies()) | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Failed to upload cookie: {str(e)}' | |
| }), 500 | |
| def delete_cookie(): | |
| """Delete a cookie file""" | |
| data = request.get_json() | |
| cookie_name = data.get('cookie_name') | |
| if not cookie_name: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'cookie_name is required' | |
| }), 400 | |
| cookie_path = os.path.join(COOKIES_DIR, cookie_name) | |
| if not os.path.exists(cookie_path): | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Cookie not found: {cookie_name}' | |
| }), 404 | |
| # Don't allow deleting the current cookie if it's the only one | |
| if cookie_manager.get_current_cookie_name() == cookie_name: | |
| cookies = cookie_manager.list_cookies() | |
| if len(cookies) == 1: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Cannot delete the only available cookie' | |
| }), 400 | |
| # Rotate to another cookie first | |
| cookie_manager.rotate_cookie() | |
| try: | |
| os.remove(cookie_path) | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'Cookie deleted: {cookie_name}', | |
| 'total_cookies': len(cookie_manager.list_cookies()) | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Failed to delete cookie: {str(e)}' | |
| }), 500 | |
| def home(): | |
| """API documentation""" | |
| return jsonify({ | |
| 'name': 'YouTube Audio API with Cookie Management', | |
| 'version': '2.0', | |
| 'endpoints': { | |
| '/health': 'GET - Health check', | |
| '/api/audio': 'GET/POST - Get audio stream URL', | |
| '/api/cookies': 'GET - List all cookies', | |
| '/api/cookies/current': 'GET - Get current active cookie', | |
| '/api/cookies/switch': 'POST - Switch to specific cookie', | |
| '/api/cookies/rotate': 'POST - Rotate to next cookie', | |
| '/api/cookies/upload': 'POST - Upload new cookie file', | |
| '/api/cookies/delete': 'DELETE - Delete a cookie file' | |
| }, | |
| 'cookie_management': { | |
| 'list_cookies': 'GET /api/cookies', | |
| 'switch_cookie': 'POST /api/cookies/switch {"cookie_name": "cookies1.txt"}', | |
| 'rotate_cookie': 'POST /api/cookies/rotate', | |
| 'upload_cookie': 'POST /api/cookies/upload (multipart/form-data)', | |
| 'delete_cookie': 'DELETE /api/cookies/delete {"cookie_name": "cookies1.txt"}' | |
| } | |
| }) | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', 8000)) | |
| app.run(host='0.0.0.0', port=7860, debug=True) |