Spaces:
Build error
Build error
from __future__ import unicode_literals | |
import yt_dlp | |
import os | |
import time | |
import shutil | |
import logging | |
import re | |
import tempfile | |
from pathlib import Path | |
from typing import Optional, Callable, Dict, Any, Union | |
# Configuration | |
MAX_FILE_SIZE = 40 * 1024 * 1024 # 40 MB | |
FILE_TOO_LARGE_MESSAGE = "The audio file exceeds the 40MB size limit. Please try a shorter video clip or select a lower quality option." | |
MAX_RETRIES = 3 | |
RETRY_DELAY = 2 # seconds | |
DEFAULT_AUDIO_FORMAT = "mp3" | |
DEFAULT_AUDIO_QUALITY = "192" # kbps | |
SUPPORTED_FORMATS = ["mp3", "m4a", "wav", "aac", "flac", "opus"] | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
) | |
logger = logging.getLogger("youtube_downloader") | |
class DownloadLogger: | |
"""Enhanced logger for yt-dlp with callback support""" | |
def __init__(self, progress_callback: Optional[Callable[[str], None]] = None): | |
self.progress_callback = progress_callback or (lambda x: None) | |
def debug(self, msg: str) -> None: | |
if msg.startswith('[download]'): | |
# Extract progress information | |
if '%' in msg: | |
self.progress_callback(msg) | |
logger.debug(msg) | |
def warning(self, msg: str) -> None: | |
logger.warning(msg) | |
def error(self, msg: str) -> None: | |
logger.error(msg) | |
class DownloadError(Exception): | |
"""Custom exception for download errors""" | |
pass | |
def validate_url(url: str) -> bool: | |
"""Validate if the URL is a supported video platform URL""" | |
video_platforms = [ | |
r'youtube\.com', | |
r'youtu\.be', | |
r'vimeo\.com', | |
r'dailymotion\.com', | |
r'twitch\.tv', | |
r'soundcloud\.com', | |
r'instagram\.com' | |
] | |
pattern = '|'.join([f'({platform})' for platform in video_platforms]) | |
return bool(re.search(pattern, url, re.IGNORECASE)) | |
def ensure_download_directory(directory: str) -> str: | |
"""Ensure download directory exists, create if it doesn't""" | |
path = Path(directory) | |
path.mkdir(parents=True, exist_ok=True) | |
return str(path.absolute()) | |
def get_download_options( | |
output_dir: str = "./downloads/audio", | |
audio_format: str = DEFAULT_AUDIO_FORMAT, | |
audio_quality: str = DEFAULT_AUDIO_QUALITY, | |
progress_callback: Optional[Callable[[str], None]] = None | |
) -> Dict[str, Any]: | |
""" | |
Get yt-dlp download options with specified parameters | |
Args: | |
output_dir: Directory to save downloaded files | |
audio_format: Audio format (mp3, m4a, wav, etc.) | |
audio_quality: Audio quality in kbps | |
progress_callback: Function to call with progress updates | |
Returns: | |
Dictionary of yt-dlp options | |
""" | |
if audio_format not in SUPPORTED_FORMATS: | |
logger.warning(f"Unsupported format '{audio_format}', falling back to {DEFAULT_AUDIO_FORMAT}") | |
audio_format = DEFAULT_AUDIO_FORMAT | |
# Ensure download directory exists | |
output_dir = ensure_download_directory(output_dir) | |
return { | |
"format": "bestaudio/best", | |
"postprocessors": [{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": audio_format, | |
"preferredquality": audio_quality, | |
}], | |
"logger": DownloadLogger(progress_callback), | |
"outtmpl": f"{output_dir}/%(title)s.%(ext)s", | |
"noplaylist": True, | |
"quiet": False, | |
"no_warnings": False, | |
"progress_hooks": [lambda d: download_progress_hook(d, progress_callback)], | |
"overwrites": True, | |
} | |
def download_progress_hook(d: Dict[str, Any], callback: Optional[Callable[[str], None]] = None) -> None: | |
""" | |
Hook for tracking download progress | |
Args: | |
d: Download information dictionary | |
callback: Function to call with progress updates | |
""" | |
if callback is None: | |
callback = lambda x: None | |
if d['status'] == 'downloading': | |
progress = d.get('_percent_str', 'unknown progress') | |
speed = d.get('_speed_str', 'unknown speed') | |
eta = d.get('_eta_str', 'unknown ETA') | |
callback(f"Downloading: {progress} at {speed}, ETA: {eta}") | |
elif d['status'] == 'finished': | |
filename = os.path.basename(d['filename']) | |
callback(f"Download complete: {filename}") | |
logger.info(f"Download finished: {d['filename']}") | |
def estimate_file_size(info: Dict[str, Any]) -> int: | |
""" | |
Better estimate file size from video info | |
Args: | |
info: Video information dictionary | |
Returns: | |
Estimated file size in bytes | |
""" | |
# Try different fields that might contain size information | |
filesize = info.get("filesize") | |
if filesize is not None: | |
return filesize | |
filesize = info.get("filesize_approx") | |
if filesize is not None: | |
return filesize | |
# If we have duration and a bitrate, we can estimate | |
duration = info.get("duration") | |
bitrate = info.get("abr") or info.get("tbr") | |
if duration and bitrate: | |
# Estimate using bitrate (kbps) * duration (seconds) / 8 (bits to bytes) * 1024 (to KB) | |
return int(bitrate * duration * 128) # 128 = 1024 / 8 | |
# Default to a reasonable upper limit if we can't determine | |
return MAX_FILE_SIZE | |
def download_video_audio( | |
url: str, | |
output_dir: str = "./downloads/audio", | |
audio_format: str = DEFAULT_AUDIO_FORMAT, | |
audio_quality: str = DEFAULT_AUDIO_QUALITY, | |
progress_callback: Optional[Callable[[str], None]] = None | |
) -> Optional[str]: | |
""" | |
Download audio from a video URL | |
Args: | |
url: URL of the video | |
output_dir: Directory to save downloaded files | |
audio_format: Audio format (mp3, m4a, wav, etc.) | |
audio_quality: Audio quality in kbps | |
progress_callback: Function to call with progress updates | |
Returns: | |
Path to the downloaded audio file or None if download failed | |
Raises: | |
DownloadError: If download fails after retries | |
""" | |
if not validate_url(url): | |
error_msg = f"Invalid or unsupported URL: {url}" | |
logger.error(error_msg) | |
raise DownloadError(error_msg) | |
retries = 0 | |
while retries < MAX_RETRIES: | |
try: | |
if progress_callback: | |
progress_callback(f"Starting download (attempt {retries + 1}/{MAX_RETRIES})...") | |
ydl_opts = get_download_options(output_dir, audio_format, audio_quality, progress_callback) | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
logger.info(f"Downloading audio from: {url}") | |
# Extract info first without downloading | |
info = ydl.extract_info(url, download=False) | |
# Better file size estimation | |
estimated_size = estimate_file_size(info) | |
if estimated_size > MAX_FILE_SIZE: | |
error_msg = f"{FILE_TOO_LARGE_MESSAGE} (Estimated: {estimated_size / 1024 / 1024:.1f}MB)" | |
logger.error(error_msg) | |
raise DownloadError(error_msg) | |
# Now download | |
ydl.download([url]) | |
# Get the filename - needs some extra handling due to extraction | |
filename = ydl.prepare_filename(info) | |
base_filename = os.path.splitext(filename)[0] | |
final_filename = f"{base_filename}.{audio_format}" | |
# Verify file exists and return path | |
if os.path.exists(final_filename): | |
return final_filename | |
else: | |
# Try to find the file with a different extension | |
for ext in SUPPORTED_FORMATS: | |
potential_file = f"{base_filename}.{ext}" | |
if os.path.exists(potential_file): | |
return potential_file | |
# If we get here, something went wrong | |
raise FileNotFoundError(f"Could not locate downloaded file for {url}") | |
except yt_dlp.utils.DownloadError as e: | |
retries += 1 | |
error_msg = f"Download error (Attempt {retries}/{MAX_RETRIES}): {str(e)}" | |
logger.error(error_msg) | |
if progress_callback: | |
progress_callback(error_msg) | |
if "HTTP Error 429" in str(e): | |
# Rate limiting - wait longer | |
time.sleep(RETRY_DELAY * 5) | |
elif retries >= MAX_RETRIES: | |
raise DownloadError(f"Failed to download after {MAX_RETRIES} attempts: {str(e)}") | |
else: | |
time.sleep(RETRY_DELAY) | |
except Exception as e: | |
retries += 1 | |
error_msg = f"Unexpected error (Attempt {retries}/{MAX_RETRIES}): {str(e)}" | |
logger.error(error_msg) | |
if progress_callback: | |
progress_callback(error_msg) | |
if retries >= MAX_RETRIES: | |
raise DownloadError(f"Failed to download after {MAX_RETRIES} attempts: {str(e)}") | |
time.sleep(RETRY_DELAY) | |
return None | |
def delete_download(path: str) -> bool: | |
""" | |
Delete a downloaded file or directory | |
Args: | |
path: Path to file or directory to delete | |
Returns: | |
True if deletion was successful, False otherwise | |
""" | |
try: | |
if not path or not os.path.exists(path): | |
logger.warning(f"Path does not exist: {path}") | |
return False | |
if os.path.isfile(path): | |
os.remove(path) | |
logger.info(f"File deleted: {path}") | |
elif os.path.isdir(path): | |
shutil.rmtree(path) | |
logger.info(f"Directory deleted: {path}") | |
else: | |
logger.warning(f"Path is neither a file nor a directory: {path}") | |
return False | |
return True | |
except PermissionError: | |
logger.error(f"Permission denied: Unable to delete {path}") | |
except FileNotFoundError: | |
logger.error(f"File or directory not found: {path}") | |
except Exception as e: | |
logger.error(f"Error deleting {path}: {str(e)}") | |
return False | |
def trim_audio_file(input_file: str, max_duration_seconds: int = 600) -> str: | |
""" | |
Trim an audio file to a maximum duration to reduce file size | |
Args: | |
input_file: Path to input audio file | |
max_duration_seconds: Maximum duration in seconds | |
Returns: | |
Path to trimmed file | |
""" | |
try: | |
import ffmpeg | |
# Create output filename | |
file_dir = os.path.dirname(input_file) | |
file_name, file_ext = os.path.splitext(os.path.basename(input_file)) | |
output_file = os.path.join(file_dir, f"{file_name}_trimmed{file_ext}") | |
# Trim using ffmpeg | |
ffmpeg.input(input_file).output( | |
output_file, t=str(max_duration_seconds), acodec='copy' | |
).run(quiet=True, overwrite_output=True) | |
logger.info(f"Trimmed {input_file} to {max_duration_seconds} seconds") | |
return output_file | |
except Exception as e: | |
logger.error(f"Error trimming audio: {str(e)}") | |
return input_file # Return original if trimming fails | |
def get_video_info(url: str) -> Dict[str, Any]: | |
""" | |
Get information about a video without downloading | |
Args: | |
url: URL of the video | |
Returns: | |
Dictionary of video information | |
""" | |
try: | |
with yt_dlp.YoutubeDL({"quiet": True}) as ydl: | |
info = ydl.extract_info(url, download=False) | |
return info | |
except Exception as e: | |
logger.error(f"Error getting video info: {str(e)}") | |
raise DownloadError(f"Could not retrieve video information: {str(e)}") |