File size: 3,726 Bytes
b6ac700
 
 
 
 
 
 
922fe2a
 
 
b6ac700
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4c650d7
b6ac700
 
 
 
 
3ab1530
b6ac700
 
5b0aac0
 
b6ac700
 
 
 
 
 
 
 
922fe2a
 
b6ac700
922fe2a
4fbf19d
 
 
 
 
 
 
b6ac700
4fbf19d
b6ac700
4fbf19d
 
 
b6ac700
4fbf19d
 
b6ac700
4fbf19d
 
b6ac700
922fe2a
 
b6ac700
 
922fe2a
b6ac700
 
 
 
 
 
 
922fe2a
b6ac700
 
 
 
 
922fe2a
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from tempfile import mkdtemp
from typing import List
from yt_dlp import YoutubeDL

import yt_dlp
from yt_dlp.postprocessor import PostProcessor

import io
from contextlib import redirect_stderr

class FilenameCollectorPP(PostProcessor):
    def __init__(self):
        super(FilenameCollectorPP, self).__init__(None)
        self.filenames = []

    def run(self, information):
        self.filenames.append(information["filepath"])
        return [], information

def download_url(url: str, maxDuration: int = None, destinationDirectory: str = None, playlistItems: str = "1") -> List[str]: 
    try:
        return _perform_download(url, maxDuration=maxDuration, outputTemplate=None, destinationDirectory=destinationDirectory, playlistItems=playlistItems)
    except yt_dlp.utils.DownloadError as e:
        # In case of an OS error, try again with a different output template
        if e.msg and e.msg.find("[Errno 36] File name too long") >= 0:
            return _perform_download(url, maxDuration=maxDuration, outputTemplate="%(title).10s %(id)s.%(ext)s")
        pass

def _perform_download(url: str, maxDuration: int = None, outputTemplate: str = None, destinationDirectory: str = None, playlistItems: str = "1", onlyAudio: bool = False):
    # Create a temporary directory to store the downloaded files
    if destinationDirectory is None:
        destinationDirectory = mkdtemp()

    ydl_opts = {
        "format": "bestaudio/best" if onlyAudio else "bestvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/best",
        'paths': {
            'home': destinationDirectory
        },
        "ignoreerrors": True
    }
    if (playlistItems):
        ydl_opts['playlist_items'] = playlistItems

    # Add output template if specified
    if outputTemplate:
        ydl_opts['outtmpl'] = outputTemplate

    errStrIO = EventStringIO(on_write=lambda text: print(f"\033[91m{text}\033[0m"))
    
    filename_collector = FilenameCollectorPP()
    with redirect_stderr(errStrIO):
        for _ in (True,):
            with YoutubeDL(ydl_opts) as ydl:
                if maxDuration and maxDuration > 0:
                    info = ydl.extract_info(url, download=False)
                    if not info: break
                
                    entries = "entries" in info and info["entries"] or [info]

                    total_duration = 0

                    # Compute total duration
                    for entry in entries:
                        if entry: total_duration += float(entry["duration"])

                    if total_duration >= maxDuration:
                        raise ExceededMaximumDuration(videoDuration=total_duration, maxDuration=maxDuration, message="Video is too long")

                ydl.add_post_processor(filename_collector)
                ydl.download([url])

    errMsg = errStrIO.getvalue()
    errMsg = [text for text in errMsg.split("\n") if text.startswith("ERROR")] if errMsg else ""

    if len(filename_collector.filenames) <= 0:
        raise Exception(f"Cannot download {url}, " + "\n".join(errMsg) if errMsg else "")

    result = []

    for filename in filename_collector.filenames:
        result.append(filename)
        print("Downloaded " + filename)

    return result

class ExceededMaximumDuration(Exception):
    def __init__(self, videoDuration, maxDuration, message):
        self.videoDuration = videoDuration
        self.maxDuration = maxDuration
        super().__init__(message)

class EventStringIO(io.StringIO):
    def __init__(self, on_write=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.on_write = on_write

    def write(self, text):
        super().write(text)
        if self.on_write:
            self.on_write(text)