|
from tempfile import mkdtemp |
|
from typing import List |
|
from yt_dlp import YoutubeDL |
|
|
|
import yt_dlp |
|
from yt_dlp.postprocessor import PostProcessor |
|
|
|
import io |
|
from contextlib import redirect_stderr |
|
|
|
class FilenameCollectorPP(PostProcessor): |
|
def __init__(self): |
|
super(FilenameCollectorPP, self).__init__(None) |
|
self.filenames = [] |
|
|
|
def run(self, information): |
|
self.filenames.append(information["filepath"]) |
|
return [], information |
|
|
|
def download_url(url: str, maxDuration: int = None, destinationDirectory: str = None, playlistItems: str = "1") -> List[str]: |
|
try: |
|
return _perform_download(url, maxDuration=maxDuration, outputTemplate=None, destinationDirectory=destinationDirectory, playlistItems=playlistItems) |
|
except yt_dlp.utils.DownloadError as e: |
|
|
|
if e.msg and e.msg.find("[Errno 36] File name too long") >= 0: |
|
return _perform_download(url, maxDuration=maxDuration, outputTemplate="%(title).10s %(id)s.%(ext)s") |
|
pass |
|
|
|
def _perform_download(url: str, maxDuration: int = None, outputTemplate: str = None, destinationDirectory: str = None, playlistItems: str = "1", onlyAudio: bool = False): |
|
|
|
if destinationDirectory is None: |
|
destinationDirectory = mkdtemp() |
|
|
|
ydl_opts = { |
|
"format": "bestaudio/best" if onlyAudio else "bestvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/best", |
|
'paths': { |
|
'home': destinationDirectory |
|
}, |
|
"ignoreerrors": True |
|
} |
|
if (playlistItems): |
|
ydl_opts['playlist_items'] = playlistItems |
|
|
|
|
|
if outputTemplate: |
|
ydl_opts['outtmpl'] = outputTemplate |
|
|
|
errStrIO = EventStringIO(on_write=lambda text: print(f"\033[91m{text}\033[0m")) |
|
|
|
filename_collector = FilenameCollectorPP() |
|
with redirect_stderr(errStrIO): |
|
for _ in (True,): |
|
with YoutubeDL(ydl_opts) as ydl: |
|
if maxDuration and maxDuration > 0: |
|
info = ydl.extract_info(url, download=False) |
|
if not info: break |
|
|
|
entries = "entries" in info and info["entries"] or [info] |
|
|
|
total_duration = 0 |
|
|
|
|
|
for entry in entries: |
|
if entry: total_duration += float(entry["duration"]) |
|
|
|
if total_duration >= maxDuration: |
|
raise ExceededMaximumDuration(videoDuration=total_duration, maxDuration=maxDuration, message="Video is too long") |
|
|
|
ydl.add_post_processor(filename_collector) |
|
ydl.download([url]) |
|
|
|
errMsg = errStrIO.getvalue() |
|
errMsg = [text for text in errMsg.split("\n") if text.startswith("ERROR")] if errMsg else "" |
|
|
|
if len(filename_collector.filenames) <= 0: |
|
raise Exception(f"Cannot download {url}, " + "\n".join(errMsg) if errMsg else "") |
|
|
|
result = [] |
|
|
|
for filename in filename_collector.filenames: |
|
result.append(filename) |
|
print("Downloaded " + filename) |
|
|
|
return result |
|
|
|
class ExceededMaximumDuration(Exception): |
|
def __init__(self, videoDuration, maxDuration, message): |
|
self.videoDuration = videoDuration |
|
self.maxDuration = maxDuration |
|
super().__init__(message) |
|
|
|
class EventStringIO(io.StringIO): |
|
def __init__(self, on_write=None, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.on_write = on_write |
|
|
|
def write(self, text): |
|
super().write(text) |
|
if self.on_write: |
|
self.on_write(text) |