voice-chat-api / app.py
NitinBot001's picture
Update app.py
ad097e4 verified
from flask import Flask, request, jsonify
import yt_dlp
import os
from urllib.parse import urlparse, parse_qs
import logging
import json
from datetime import datetime
import glob
app = Flask(__name__)
# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Cookie management
COOKIES_DIR = os.environ.get('COOKIES_DIR', 'cookies')
COOKIE_STATE_FILE = os.path.join(COOKIES_DIR, 'cookie_state.json')
# Create cookies directory if not exists
os.makedirs(COOKIES_DIR, exist_ok=True)
class CookieManager:
def __init__(self):
self.load_state()
def load_state(self):
"""Load cookie state from file"""
if os.path.exists(COOKIE_STATE_FILE):
try:
with open(COOKIE_STATE_FILE, 'r') as f:
self.state = json.load(f)
except:
self.state = {'current_cookie': None, 'cookie_status': {}}
else:
self.state = {'current_cookie': None, 'cookie_status': {}}
# Auto-select first available cookie if none selected
if not self.state['current_cookie']:
cookies = self.list_cookies()
if cookies:
self.state['current_cookie'] = cookies[0]
self.save_state()
def save_state(self):
"""Save cookie state to file"""
with open(COOKIE_STATE_FILE, 'w') as f:
json.dump(self.state, f, indent=2)
def list_cookies(self):
"""List all available cookie files"""
cookie_files = glob.glob(os.path.join(COOKIES_DIR, '*.txt'))
return [os.path.basename(f) for f in cookie_files]
def get_current_cookie(self):
"""Get current active cookie file path"""
if self.state['current_cookie']:
cookie_path = os.path.join(COOKIES_DIR, self.state['current_cookie'])
if os.path.exists(cookie_path):
return cookie_path
return None
def get_current_cookie_name(self):
"""Get current cookie filename"""
return self.state['current_cookie']
def set_current_cookie(self, cookie_name):
"""Set active cookie"""
cookie_path = os.path.join(COOKIES_DIR, cookie_name)
if os.path.exists(cookie_path):
self.state['current_cookie'] = cookie_name
self.save_state()
return True
return False
def rotate_cookie(self):
"""Rotate to next available cookie"""
cookies = self.list_cookies()
if len(cookies) <= 1:
return False, "No other cookies available to rotate"
current = self.state['current_cookie']
try:
current_idx = cookies.index(current)
next_idx = (current_idx + 1) % len(cookies)
next_cookie = cookies[next_idx]
except (ValueError, IndexError):
next_cookie = cookies[0]
self.state['current_cookie'] = next_cookie
self.state['cookie_status'][current] = {
'status': 'rotated',
'timestamp': datetime.now().isoformat()
}
self.save_state()
return True, next_cookie
def mark_cookie_failed(self, cookie_name, error):
"""Mark a cookie as failed"""
self.state['cookie_status'][cookie_name] = {
'status': 'failed',
'error': str(error),
'timestamp': datetime.now().isoformat()
}
self.save_state()
def mark_cookie_working(self, cookie_name):
"""Mark a cookie as working"""
self.state['cookie_status'][cookie_name] = {
'status': 'working',
'timestamp': datetime.now().isoformat()
}
self.save_state()
def get_cookie_info(self):
"""Get info about all cookies"""
cookies = self.list_cookies()
return {
'current_cookie': self.state['current_cookie'],
'available_cookies': cookies,
'total_cookies': len(cookies),
'cookie_status': self.state['cookie_status']
}
# Initialize cookie manager
cookie_manager = CookieManager()
def get_video_id(url):
"""Extract video ID from YouTube URL"""
parsed_url = urlparse(url)
if parsed_url.hostname in ['www.youtube.com', 'youtube.com']:
if parsed_url.path == '/watch':
return parse_qs(parsed_url.query)['v'][0]
elif parsed_url.path.startswith('/embed/'):
return parsed_url.path.split('/')[2]
elif parsed_url.hostname in ['youtu.be']:
return parsed_url.path[1:]
return None
def get_audio_url(youtube_url, auto_rotate=True):
"""Get direct audio stream URL using yt-dlp"""
current_cookie = cookie_manager.get_current_cookie()
current_cookie_name = cookie_manager.get_current_cookie_name()
ydl_opts = {
'format': 'bestaudio/best',
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'cookiefile': current_cookie if current_cookie else None,
'nocheckcertificate': True,
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(youtube_url, download=False)
# Get the best audio format
audio_url = None
title = info.get('title', 'Unknown')
duration = info.get('duration', 0)
thumbnail = info.get('thumbnail', '')
# Try to get direct audio URL
if 'url' in info:
audio_url = info['url']
elif 'formats' in info:
# Find best audio format
audio_formats = [f for f in info['formats']
if f.get('acodec') != 'none' and f.get('vcodec') == 'none']
if audio_formats:
best_audio = max(audio_formats,
key=lambda f: f.get('abr', 0) or f.get('tbr', 0))
audio_url = best_audio['url']
else:
# Fallback to any format with audio
formats_with_audio = [f for f in info['formats']
if f.get('acodec') != 'none']
if formats_with_audio:
best_format = max(formats_with_audio,
key=lambda f: f.get('abr', 0) or f.get('tbr', 0))
audio_url = best_format['url']
if not audio_url:
raise Exception("Could not find audio stream")
# Mark cookie as working
if current_cookie_name:
cookie_manager.mark_cookie_working(current_cookie_name)
return {
'success': True,
'audio_url': audio_url,
'title': title,
'duration': duration,
'thumbnail': thumbnail,
'video_id': get_video_id(youtube_url),
'cookie_used': current_cookie_name
}
except Exception as e:
error_str = str(e).lower()
logger.error(f"Error extracting audio: {str(e)}")
# Check if error is related to authentication/cookies
is_auth_error = any(keyword in error_str for keyword in
['sign in', 'login', 'cookies', 'authentication', 'forbidden', 'bot'])
# Mark current cookie as failed
if current_cookie_name and is_auth_error:
cookie_manager.mark_cookie_failed(current_cookie_name, str(e))
# Auto-rotate if enabled and auth error detected
if auto_rotate:
success, next_cookie = cookie_manager.rotate_cookie()
if success:
logger.info(f"Auto-rotating to cookie: {next_cookie}")
return get_audio_url(youtube_url, auto_rotate=False) # Try once more
return {
'success': False,
'error': str(e),
'cookie_used': current_cookie_name,
'is_auth_error': is_auth_error
}
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
cookie_info = cookie_manager.get_cookie_info()
current_cookie = cookie_manager.get_current_cookie()
return jsonify({
'status': 'healthy',
'current_cookie': cookie_info['current_cookie'],
'current_cookie_exists': current_cookie is not None,
'total_cookies': cookie_info['total_cookies'],
'available_cookies': cookie_info['available_cookies']
})
@app.route('/api/audio', methods=['GET', 'POST'])
def get_audio():
"""Main endpoint to get audio URL"""
try:
# Get URL from query parameter or JSON body
if request.method == 'GET':
youtube_url = request.args.get('url')
auto_rotate = request.args.get('auto_rotate', 'true').lower() == 'true'
else:
data = request.get_json()
youtube_url = data.get('url') if data else None
auto_rotate = data.get('auto_rotate', True) if data else True
if not youtube_url:
return jsonify({
'success': False,
'error': 'YouTube URL is required. Use ?url=<youtube_url> or POST with {"url": "<youtube_url>"}'
}), 400
# Validate YouTube URL
if 'youtube.com' not in youtube_url and 'youtu.be' not in youtube_url:
return jsonify({
'success': False,
'error': 'Invalid YouTube URL'
}), 400
result = get_audio_url(youtube_url, auto_rotate=auto_rotate)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 500
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return jsonify({
'success': False,
'error': f'Server error: {str(e)}'
}), 500
@app.route('/api/cookies', methods=['GET'])
def list_cookies():
"""List all available cookies and their status"""
cookie_info = cookie_manager.get_cookie_info()
return jsonify(cookie_info), 200
@app.route('/api/cookies/current', methods=['GET'])
def get_current_cookie():
"""Get current active cookie"""
current = cookie_manager.get_current_cookie_name()
return jsonify({
'current_cookie': current,
'cookie_exists': cookie_manager.get_current_cookie() is not None
}), 200
@app.route('/api/cookies/switch', methods=['POST'])
def switch_cookie():
"""Switch to a specific cookie"""
data = request.get_json()
cookie_name = data.get('cookie_name')
if not cookie_name:
return jsonify({
'success': False,
'error': 'cookie_name is required'
}), 400
success = cookie_manager.set_current_cookie(cookie_name)
if success:
return jsonify({
'success': True,
'message': f'Switched to cookie: {cookie_name}',
'current_cookie': cookie_name
}), 200
else:
return jsonify({
'success': False,
'error': f'Cookie file not found: {cookie_name}'
}), 404
@app.route('/api/cookies/rotate', methods=['POST'])
def rotate_cookie():
"""Rotate to next available cookie"""
success, result = cookie_manager.rotate_cookie()
if success:
return jsonify({
'success': True,
'message': f'Rotated to cookie: {result}',
'current_cookie': result
}), 200
else:
return jsonify({
'success': False,
'error': result
}), 400
@app.route('/api/cookies/upload', methods=['POST'])
def upload_cookie():
"""Upload a new cookie file"""
if 'file' not in request.files:
return jsonify({
'success': False,
'error': 'No file provided'
}), 400
file = request.files['file']
if file.filename == '':
return jsonify({
'success': False,
'error': 'No file selected'
}), 400
# Ensure .txt extension
if not file.filename.endswith('.txt'):
file.filename += '.txt'
try:
# Save file
file_path = os.path.join(COOKIES_DIR, file.filename)
file.save(file_path)
# If this is the first cookie, set it as current
if not cookie_manager.get_current_cookie():
cookie_manager.set_current_cookie(file.filename)
return jsonify({
'success': True,
'message': f'Cookie uploaded successfully: {file.filename}',
'filename': file.filename,
'total_cookies': len(cookie_manager.list_cookies())
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': f'Failed to upload cookie: {str(e)}'
}), 500
@app.route('/api/cookies/delete', methods=['DELETE'])
def delete_cookie():
"""Delete a cookie file"""
data = request.get_json()
cookie_name = data.get('cookie_name')
if not cookie_name:
return jsonify({
'success': False,
'error': 'cookie_name is required'
}), 400
cookie_path = os.path.join(COOKIES_DIR, cookie_name)
if not os.path.exists(cookie_path):
return jsonify({
'success': False,
'error': f'Cookie not found: {cookie_name}'
}), 404
# Don't allow deleting the current cookie if it's the only one
if cookie_manager.get_current_cookie_name() == cookie_name:
cookies = cookie_manager.list_cookies()
if len(cookies) == 1:
return jsonify({
'success': False,
'error': 'Cannot delete the only available cookie'
}), 400
# Rotate to another cookie first
cookie_manager.rotate_cookie()
try:
os.remove(cookie_path)
return jsonify({
'success': True,
'message': f'Cookie deleted: {cookie_name}',
'total_cookies': len(cookie_manager.list_cookies())
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': f'Failed to delete cookie: {str(e)}'
}), 500
@app.route('/', methods=['GET'])
def home():
"""API documentation"""
return jsonify({
'name': 'YouTube Audio API with Cookie Management',
'version': '2.0',
'endpoints': {
'/health': 'GET - Health check',
'/api/audio': 'GET/POST - Get audio stream URL',
'/api/cookies': 'GET - List all cookies',
'/api/cookies/current': 'GET - Get current active cookie',
'/api/cookies/switch': 'POST - Switch to specific cookie',
'/api/cookies/rotate': 'POST - Rotate to next cookie',
'/api/cookies/upload': 'POST - Upload new cookie file',
'/api/cookies/delete': 'DELETE - Delete a cookie file'
},
'cookie_management': {
'list_cookies': 'GET /api/cookies',
'switch_cookie': 'POST /api/cookies/switch {"cookie_name": "cookies1.txt"}',
'rotate_cookie': 'POST /api/cookies/rotate',
'upload_cookie': 'POST /api/cookies/upload (multipart/form-data)',
'delete_cookie': 'DELETE /api/cookies/delete {"cookie_name": "cookies1.txt"}'
}
})
if __name__ == '__main__':
port = int(os.environ.get('PORT', 8000))
app.run(host='0.0.0.0', port=7860, debug=True)