tubifier / pytube /request.py
wldmr's picture
app file
837fdb6
raw
history blame
8.51 kB
"""Implements a simple wrapper around urlopen."""
import http.client
import json
import logging
import re
import socket
from functools import lru_cache
from urllib import parse
from urllib.error import URLError
from urllib.request import Request, urlopen
from pytube.exceptions import RegexMatchError, MaxRetriesExceeded
from pytube.helpers import regex_search
logger = logging.getLogger(__name__)
default_range_size = 9437184 # 9MB
def _execute_request(
url,
method=None,
headers=None,
data=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT
):
base_headers = {"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en"}
if headers:
base_headers.update(headers)
if data:
# encode data for request
if not isinstance(data, bytes):
data = bytes(json.dumps(data), encoding="utf-8")
if url.lower().startswith("http"):
request = Request(url, headers=base_headers, method=method, data=data)
else:
raise ValueError("Invalid URL")
return urlopen(request, timeout=timeout) # nosec
def get(url, extra_headers=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
"""Send an http GET request.
:param str url:
The URL to perform the GET request for.
:param dict extra_headers:
Extra headers to add to the request
:rtype: str
:returns:
UTF-8 encoded string of response
"""
if extra_headers is None:
extra_headers = {}
response = _execute_request(url, headers=extra_headers, timeout=timeout)
return response.read().decode("utf-8")
def post(url, extra_headers=None, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
"""Send an http POST request.
:param str url:
The URL to perform the POST request for.
:param dict extra_headers:
Extra headers to add to the request
:param dict data:
The data to send on the POST request
:rtype: str
:returns:
UTF-8 encoded string of response
"""
# could technically be implemented in get,
# but to avoid confusion implemented like this
if extra_headers is None:
extra_headers = {}
if data is None:
data = {}
# required because the youtube servers are strict on content type
# raises HTTPError [400]: Bad Request otherwise
extra_headers.update({"Content-Type": "application/json"})
response = _execute_request(
url,
headers=extra_headers,
data=data,
timeout=timeout
)
return response.read().decode("utf-8")
def seq_stream(
url,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
max_retries=0
):
"""Read the response in sequence.
:param str url: The URL to perform the GET request for.
:rtype: Iterable[bytes]
"""
# YouTube expects a request sequence number as part of the parameters.
split_url = parse.urlsplit(url)
base_url = '%s://%s/%s?' % (split_url.scheme, split_url.netloc, split_url.path)
querys = dict(parse.parse_qsl(split_url.query))
# The 0th sequential request provides the file headers, which tell us
# information about how the file is segmented.
querys['sq'] = 0
url = base_url + parse.urlencode(querys)
segment_data = b''
for chunk in stream(url, timeout=timeout, max_retries=max_retries):
yield chunk
segment_data += chunk
# We can then parse the header to find the number of segments
stream_info = segment_data.split(b'\r\n')
segment_count_pattern = re.compile(b'Segment-Count: (\\d+)')
for line in stream_info:
match = segment_count_pattern.search(line)
if match:
segment_count = int(match.group(1).decode('utf-8'))
# We request these segments sequentially to build the file.
seq_num = 1
while seq_num <= segment_count:
# Create sequential request URL
querys['sq'] = seq_num
url = base_url + parse.urlencode(querys)
yield from stream(url, timeout=timeout, max_retries=max_retries)
seq_num += 1
return # pylint: disable=R1711
def stream(
url,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
max_retries=0
):
"""Read the response in chunks.
:param str url: The URL to perform the GET request for.
:rtype: Iterable[bytes]
"""
file_size: int = default_range_size # fake filesize to start
downloaded = 0
while downloaded < file_size:
stop_pos = min(downloaded + default_range_size, file_size) - 1
range_header = f"bytes={downloaded}-{stop_pos}"
tries = 0
# Attempt to make the request multiple times as necessary.
while True:
# If the max retries is exceeded, raise an exception
if tries >= 1 + max_retries:
raise MaxRetriesExceeded()
# Try to execute the request, ignoring socket timeouts
try:
response = _execute_request(
url,
method="GET",
headers={"Range": range_header},
timeout=timeout
)
except URLError as e:
# We only want to skip over timeout errors, and
# raise any other URLError exceptions
if isinstance(e.reason, socket.timeout):
pass
else:
raise
except http.client.IncompleteRead:
# Allow retries on IncompleteRead errors for unreliable connections
pass
else:
# On a successful request, break from loop
break
tries += 1
if file_size == default_range_size:
try:
content_range = response.info()["Content-Range"]
file_size = int(content_range.split("/")[1])
except (KeyError, IndexError, ValueError) as e:
logger.error(e)
while True:
chunk = response.read()
if not chunk:
break
downloaded += len(chunk)
yield chunk
return # pylint: disable=R1711
@lru_cache()
def filesize(url):
"""Fetch size in bytes of file at given URL
:param str url: The URL to get the size of
:returns: int: size in bytes of remote file
"""
return int(head(url)["content-length"])
@lru_cache()
def seq_filesize(url):
"""Fetch size in bytes of file at given URL from sequential requests
:param str url: The URL to get the size of
:returns: int: size in bytes of remote file
"""
total_filesize = 0
# YouTube expects a request sequence number as part of the parameters.
split_url = parse.urlsplit(url)
base_url = '%s://%s/%s?' % (split_url.scheme, split_url.netloc, split_url.path)
querys = dict(parse.parse_qsl(split_url.query))
# The 0th sequential request provides the file headers, which tell us
# information about how the file is segmented.
querys['sq'] = 0
url = base_url + parse.urlencode(querys)
response = _execute_request(
url, method="GET"
)
response_value = response.read()
# The file header must be added to the total filesize
total_filesize += len(response_value)
# We can then parse the header to find the number of segments
segment_count = 0
stream_info = response_value.split(b'\r\n')
segment_regex = b'Segment-Count: (\\d+)'
for line in stream_info:
# One of the lines should contain the segment count, but we don't know
# which, so we need to iterate through the lines to find it
try:
segment_count = int(regex_search(segment_regex, line, 1))
except RegexMatchError:
pass
if segment_count == 0:
raise RegexMatchError('seq_filesize', segment_regex)
# We make HEAD requests to the segments sequentially to find the total filesize.
seq_num = 1
while seq_num <= segment_count:
# Create sequential request URL
querys['sq'] = seq_num
url = base_url + parse.urlencode(querys)
total_filesize += int(head(url)['content-length'])
seq_num += 1
return total_filesize
def head(url):
"""Fetch headers returned http GET request.
:param str url:
The URL to perform the GET request for.
:rtype: dict
:returns:
dictionary of lowercase headers
"""
response_headers = _execute_request(url, method="HEAD").info()
return {k.lower(): v for k, v in response_headers.items()}