from tempfile import mkdtemp from typing import List from yt_dlp import YoutubeDL import yt_dlp from yt_dlp.postprocessor import PostProcessor import io from contextlib import redirect_stderr class FilenameCollectorPP(PostProcessor): def __init__(self): super(FilenameCollectorPP, self).__init__(None) self.filenames = [] def run(self, information): self.filenames.append(information["filepath"]) return [], information def download_url(url: str, maxDuration: int = None, destinationDirectory: str = None, playlistItems: str = "1") -> List[str]: try: return _perform_download(url, maxDuration=maxDuration, outputTemplate=None, destinationDirectory=destinationDirectory, playlistItems=playlistItems) except yt_dlp.utils.DownloadError as e: # In case of an OS error, try again with a different output template if e.msg and e.msg.find("[Errno 36] File name too long") >= 0: return _perform_download(url, maxDuration=maxDuration, outputTemplate="%(title).10s %(id)s.%(ext)s") pass def _perform_download(url: str, maxDuration: int = None, outputTemplate: str = None, destinationDirectory: str = None, playlistItems: str = "1", onlyAudio: bool = False): # Create a temporary directory to store the downloaded files if destinationDirectory is None: destinationDirectory = mkdtemp() ydl_opts = { "format": "bestaudio/best" if onlyAudio else "bestvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/best", 'paths': { 'home': destinationDirectory }, "ignoreerrors": True } if (playlistItems): ydl_opts['playlist_items'] = playlistItems # Add output template if specified if outputTemplate: ydl_opts['outtmpl'] = outputTemplate errStrIO = EventStringIO(on_write=lambda text: print(f"\033[91m{text}\033[0m")) filename_collector = FilenameCollectorPP() with redirect_stderr(errStrIO): for _ in (True,): with YoutubeDL(ydl_opts) as ydl: if maxDuration and maxDuration > 0: info = ydl.extract_info(url, download=False) if not info: break entries = "entries" in info and info["entries"] or [info] total_duration = 0 # Compute total duration for entry in entries: if entry: total_duration += float(entry["duration"]) if total_duration >= maxDuration: raise ExceededMaximumDuration(videoDuration=total_duration, maxDuration=maxDuration, message="Video is too long") ydl.add_post_processor(filename_collector) ydl.download([url]) errMsg = errStrIO.getvalue() errMsg = [text for text in errMsg.split("\n") if text.startswith("ERROR")] if errMsg else "" if len(filename_collector.filenames) <= 0: raise Exception(f"Cannot download {url}, " + "\n".join(errMsg) if errMsg else "") result = [] for filename in filename_collector.filenames: result.append(filename) print("Downloaded " + filename) return result class ExceededMaximumDuration(Exception): def __init__(self, videoDuration, maxDuration, message): self.videoDuration = videoDuration self.maxDuration = maxDuration super().__init__(message) class EventStringIO(io.StringIO): def __init__(self, on_write=None, *args, **kwargs): super().__init__(*args, **kwargs) self.on_write = on_write def write(self, text): super().write(text) if self.on_write: self.on_write(text)