|
"""Implements a simple wrapper around urlopen.""" |
|
import http.client |
|
import json |
|
import logging |
|
import re |
|
import socket |
|
from functools import lru_cache |
|
from urllib import parse |
|
from urllib.error import URLError |
|
from urllib.request import Request, urlopen |
|
|
|
from pytube.exceptions import RegexMatchError, MaxRetriesExceeded |
|
from pytube.helpers import regex_search |
|
|
|
logger = logging.getLogger(__name__) |
|
default_range_size = 9437184 |
|
|
|
|
|
def _execute_request( |
|
url, |
|
method=None, |
|
headers=None, |
|
data=None, |
|
timeout=socket._GLOBAL_DEFAULT_TIMEOUT |
|
): |
|
base_headers = {"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en"} |
|
if headers: |
|
base_headers.update(headers) |
|
if data: |
|
|
|
if not isinstance(data, bytes): |
|
data = bytes(json.dumps(data), encoding="utf-8") |
|
if url.lower().startswith("http"): |
|
request = Request(url, headers=base_headers, method=method, data=data) |
|
else: |
|
raise ValueError("Invalid URL") |
|
return urlopen(request, timeout=timeout) |
|
|
|
|
|
def get(url, extra_headers=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): |
|
"""Send an http GET request. |
|
|
|
:param str url: |
|
The URL to perform the GET request for. |
|
:param dict extra_headers: |
|
Extra headers to add to the request |
|
:rtype: str |
|
:returns: |
|
UTF-8 encoded string of response |
|
""" |
|
if extra_headers is None: |
|
extra_headers = {} |
|
response = _execute_request(url, headers=extra_headers, timeout=timeout) |
|
return response.read().decode("utf-8") |
|
|
|
|
|
def post(url, extra_headers=None, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): |
|
"""Send an http POST request. |
|
|
|
:param str url: |
|
The URL to perform the POST request for. |
|
:param dict extra_headers: |
|
Extra headers to add to the request |
|
:param dict data: |
|
The data to send on the POST request |
|
:rtype: str |
|
:returns: |
|
UTF-8 encoded string of response |
|
""" |
|
|
|
|
|
if extra_headers is None: |
|
extra_headers = {} |
|
if data is None: |
|
data = {} |
|
|
|
|
|
extra_headers.update({"Content-Type": "application/json"}) |
|
response = _execute_request( |
|
url, |
|
headers=extra_headers, |
|
data=data, |
|
timeout=timeout |
|
) |
|
return response.read().decode("utf-8") |
|
|
|
|
|
def seq_stream( |
|
url, |
|
timeout=socket._GLOBAL_DEFAULT_TIMEOUT, |
|
max_retries=0 |
|
): |
|
"""Read the response in sequence. |
|
:param str url: The URL to perform the GET request for. |
|
:rtype: Iterable[bytes] |
|
""" |
|
|
|
split_url = parse.urlsplit(url) |
|
base_url = '%s://%s/%s?' % (split_url.scheme, split_url.netloc, split_url.path) |
|
|
|
querys = dict(parse.parse_qsl(split_url.query)) |
|
|
|
|
|
|
|
querys['sq'] = 0 |
|
url = base_url + parse.urlencode(querys) |
|
|
|
segment_data = b'' |
|
for chunk in stream(url, timeout=timeout, max_retries=max_retries): |
|
yield chunk |
|
segment_data += chunk |
|
|
|
|
|
stream_info = segment_data.split(b'\r\n') |
|
segment_count_pattern = re.compile(b'Segment-Count: (\\d+)') |
|
for line in stream_info: |
|
match = segment_count_pattern.search(line) |
|
if match: |
|
segment_count = int(match.group(1).decode('utf-8')) |
|
|
|
|
|
seq_num = 1 |
|
while seq_num <= segment_count: |
|
|
|
querys['sq'] = seq_num |
|
url = base_url + parse.urlencode(querys) |
|
|
|
yield from stream(url, timeout=timeout, max_retries=max_retries) |
|
seq_num += 1 |
|
return |
|
|
|
|
|
def stream( |
|
url, |
|
timeout=socket._GLOBAL_DEFAULT_TIMEOUT, |
|
max_retries=0 |
|
): |
|
"""Read the response in chunks. |
|
:param str url: The URL to perform the GET request for. |
|
:rtype: Iterable[bytes] |
|
""" |
|
file_size: int = default_range_size |
|
downloaded = 0 |
|
while downloaded < file_size: |
|
stop_pos = min(downloaded + default_range_size, file_size) - 1 |
|
range_header = f"bytes={downloaded}-{stop_pos}" |
|
tries = 0 |
|
|
|
|
|
while True: |
|
|
|
if tries >= 1 + max_retries: |
|
raise MaxRetriesExceeded() |
|
|
|
|
|
try: |
|
response = _execute_request( |
|
url, |
|
method="GET", |
|
headers={"Range": range_header}, |
|
timeout=timeout |
|
) |
|
except URLError as e: |
|
|
|
|
|
if isinstance(e.reason, socket.timeout): |
|
pass |
|
else: |
|
raise |
|
except http.client.IncompleteRead: |
|
|
|
pass |
|
else: |
|
|
|
break |
|
tries += 1 |
|
|
|
if file_size == default_range_size: |
|
try: |
|
content_range = response.info()["Content-Range"] |
|
file_size = int(content_range.split("/")[1]) |
|
except (KeyError, IndexError, ValueError) as e: |
|
logger.error(e) |
|
while True: |
|
chunk = response.read() |
|
if not chunk: |
|
break |
|
downloaded += len(chunk) |
|
yield chunk |
|
return |
|
|
|
|
|
@lru_cache() |
|
def filesize(url): |
|
"""Fetch size in bytes of file at given URL |
|
|
|
:param str url: The URL to get the size of |
|
:returns: int: size in bytes of remote file |
|
""" |
|
return int(head(url)["content-length"]) |
|
|
|
|
|
@lru_cache() |
|
def seq_filesize(url): |
|
"""Fetch size in bytes of file at given URL from sequential requests |
|
|
|
:param str url: The URL to get the size of |
|
:returns: int: size in bytes of remote file |
|
""" |
|
total_filesize = 0 |
|
|
|
split_url = parse.urlsplit(url) |
|
base_url = '%s://%s/%s?' % (split_url.scheme, split_url.netloc, split_url.path) |
|
querys = dict(parse.parse_qsl(split_url.query)) |
|
|
|
|
|
|
|
querys['sq'] = 0 |
|
url = base_url + parse.urlencode(querys) |
|
response = _execute_request( |
|
url, method="GET" |
|
) |
|
|
|
response_value = response.read() |
|
|
|
total_filesize += len(response_value) |
|
|
|
|
|
segment_count = 0 |
|
stream_info = response_value.split(b'\r\n') |
|
segment_regex = b'Segment-Count: (\\d+)' |
|
for line in stream_info: |
|
|
|
|
|
try: |
|
segment_count = int(regex_search(segment_regex, line, 1)) |
|
except RegexMatchError: |
|
pass |
|
|
|
if segment_count == 0: |
|
raise RegexMatchError('seq_filesize', segment_regex) |
|
|
|
|
|
seq_num = 1 |
|
while seq_num <= segment_count: |
|
|
|
querys['sq'] = seq_num |
|
url = base_url + parse.urlencode(querys) |
|
|
|
total_filesize += int(head(url)['content-length']) |
|
seq_num += 1 |
|
return total_filesize |
|
|
|
|
|
def head(url): |
|
"""Fetch headers returned http GET request. |
|
|
|
:param str url: |
|
The URL to perform the GET request for. |
|
:rtype: dict |
|
:returns: |
|
dictionary of lowercase headers |
|
""" |
|
response_headers = _execute_request(url, method="HEAD").info() |
|
return {k.lower(): v for k, v in response_headers.items()} |
|
|