|
"""This module contains all non-cipher related data extraction logic.""" |
|
import logging |
|
import urllib.parse |
|
import re |
|
from collections import OrderedDict |
|
from datetime import datetime |
|
from typing import Any, Dict, List, Optional, Tuple |
|
from urllib.parse import parse_qs, quote, urlencode, urlparse |
|
|
|
from pytube.cipher import Cipher |
|
from pytube.exceptions import HTMLParseError, LiveStreamError, RegexMatchError |
|
from pytube.helpers import regex_search |
|
from pytube.metadata import YouTubeMetadata |
|
from pytube.parser import parse_for_object, parse_for_all_objects |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def publish_date(watch_html: str): |
|
"""Extract publish date |
|
:param str watch_html: |
|
The html contents of the watch page. |
|
:rtype: str |
|
:returns: |
|
Publish date of the video. |
|
""" |
|
try: |
|
result = regex_search( |
|
r"(?<=itemprop=\"datePublished\" content=\")\d{4}-\d{2}-\d{2}", |
|
watch_html, group=0 |
|
) |
|
except RegexMatchError: |
|
return None |
|
return datetime.strptime(result, '%Y-%m-%d') |
|
|
|
|
|
def recording_available(watch_html): |
|
"""Check if live stream recording is available. |
|
|
|
:param str watch_html: |
|
The html contents of the watch page. |
|
:rtype: bool |
|
:returns: |
|
Whether or not the content is private. |
|
""" |
|
unavailable_strings = [ |
|
'This live stream recording is not available.' |
|
] |
|
for string in unavailable_strings: |
|
if string in watch_html: |
|
return False |
|
return True |
|
|
|
|
|
def is_private(watch_html): |
|
"""Check if content is private. |
|
|
|
:param str watch_html: |
|
The html contents of the watch page. |
|
:rtype: bool |
|
:returns: |
|
Whether or not the content is private. |
|
""" |
|
private_strings = [ |
|
"This is a private video. Please sign in to verify that you may see it.", |
|
"\"simpleText\":\"Private video\"", |
|
"This video is private." |
|
] |
|
for string in private_strings: |
|
if string in watch_html: |
|
return True |
|
return False |
|
|
|
|
|
def is_age_restricted(watch_html: str) -> bool: |
|
"""Check if content is age restricted. |
|
|
|
:param str watch_html: |
|
The html contents of the watch page. |
|
:rtype: bool |
|
:returns: |
|
Whether or not the content is age restricted. |
|
""" |
|
try: |
|
regex_search(r"og:restrictions:age", watch_html, group=0) |
|
except RegexMatchError: |
|
return False |
|
return True |
|
|
|
|
|
def playability_status(watch_html: str) -> (str, str): |
|
"""Return the playability status and status explanation of a video. |
|
|
|
For example, a video may have a status of LOGIN_REQUIRED, and an explanation |
|
of "This is a private video. Please sign in to verify that you may see it." |
|
|
|
This explanation is what gets incorporated into the media player overlay. |
|
|
|
:param str watch_html: |
|
The html contents of the watch page. |
|
:rtype: bool |
|
:returns: |
|
Playability status and reason of the video. |
|
""" |
|
player_response = initial_player_response(watch_html) |
|
status_dict = player_response.get('playabilityStatus', {}) |
|
if 'liveStreamability' in status_dict: |
|
return 'LIVE_STREAM', 'Video is a live stream.' |
|
if 'status' in status_dict: |
|
if 'reason' in status_dict: |
|
return status_dict['status'], [status_dict['reason']] |
|
if 'messages' in status_dict: |
|
return status_dict['status'], status_dict['messages'] |
|
return None, [None] |
|
|
|
|
|
def video_id(url: str) -> str: |
|
"""Extract the ``video_id`` from a YouTube url. |
|
|
|
This function supports the following patterns: |
|
|
|
- :samp:`https://youtube.com/watch?v={video_id}` |
|
- :samp:`https://youtube.com/embed/{video_id}` |
|
- :samp:`https://youtu.be/{video_id}` |
|
|
|
:param str url: |
|
A YouTube url containing a video id. |
|
:rtype: str |
|
:returns: |
|
YouTube video id. |
|
""" |
|
return regex_search(r"(?:v=|\/)([0-9A-Za-z_-]{11}).*", url, group=1) |
|
|
|
|
|
def playlist_id(url: str) -> str: |
|
"""Extract the ``playlist_id`` from a YouTube url. |
|
|
|
This function supports the following patterns: |
|
|
|
- :samp:`https://youtube.com/playlist?list={playlist_id}` |
|
- :samp:`https://youtube.com/watch?v={video_id}&list={playlist_id}` |
|
|
|
:param str url: |
|
A YouTube url containing a playlist id. |
|
:rtype: str |
|
:returns: |
|
YouTube playlist id. |
|
""" |
|
parsed = urllib.parse.urlparse(url) |
|
return parse_qs(parsed.query)['list'][0] |
|
|
|
|
|
def channel_name(url: str) -> str: |
|
"""Extract the ``channel_name`` or ``channel_id`` from a YouTube url. |
|
|
|
This function supports the following patterns: |
|
|
|
- :samp:`https://youtube.com/c/{channel_name}/*` |
|
- :samp:`https://youtube.com/channel/{channel_id}/* |
|
- :samp:`https://youtube.com/u/{channel_name}/*` |
|
- :samp:`https://youtube.com/user/{channel_id}/* |
|
|
|
:param str url: |
|
A YouTube url containing a channel name. |
|
:rtype: str |
|
:returns: |
|
YouTube channel name. |
|
""" |
|
patterns = [ |
|
r"(?:\/(c)\/([%\d\w_\-]+)(\/.*)?)", |
|
r"(?:\/(channel)\/([%\w\d_\-]+)(\/.*)?)", |
|
r"(?:\/(u)\/([%\d\w_\-]+)(\/.*)?)", |
|
r"(?:\/(user)\/([%\w\d_\-]+)(\/.*)?)" |
|
] |
|
for pattern in patterns: |
|
regex = re.compile(pattern) |
|
function_match = regex.search(url) |
|
if function_match: |
|
logger.debug("finished regex search, matched: %s", pattern) |
|
uri_style = function_match.group(1) |
|
uri_identifier = function_match.group(2) |
|
return f'/{uri_style}/{uri_identifier}' |
|
|
|
raise RegexMatchError( |
|
caller="channel_name", pattern="patterns" |
|
) |
|
|
|
|
|
def video_info_url(video_id: str, watch_url: str) -> str: |
|
"""Construct the video_info url. |
|
|
|
:param str video_id: |
|
A YouTube video identifier. |
|
:param str watch_url: |
|
A YouTube watch url. |
|
:rtype: str |
|
:returns: |
|
:samp:`https://youtube.com/get_video_info` with necessary GET |
|
parameters. |
|
""" |
|
params = OrderedDict( |
|
[ |
|
("video_id", video_id), |
|
("ps", "default"), |
|
("eurl", quote(watch_url)), |
|
("hl", "en_US"), |
|
("html5", "1"), |
|
("c", "TVHTML5"), |
|
("cver", "7.20201028"), |
|
] |
|
) |
|
return _video_info_url(params) |
|
|
|
|
|
def video_info_url_age_restricted(video_id: str, embed_html: str) -> str: |
|
"""Construct the video_info url. |
|
|
|
:param str video_id: |
|
A YouTube video identifier. |
|
:param str embed_html: |
|
The html contents of the embed page (for age restricted videos). |
|
:rtype: str |
|
:returns: |
|
:samp:`https://youtube.com/get_video_info` with necessary GET |
|
parameters. |
|
""" |
|
try: |
|
sts = regex_search(r'"sts"\s*:\s*(\d+)', embed_html, group=1) |
|
except RegexMatchError: |
|
sts = "" |
|
|
|
|
|
eurl = f"https://youtube.googleapis.com/v/{video_id}" |
|
params = OrderedDict( |
|
[ |
|
("video_id", video_id), |
|
("eurl", eurl), |
|
("sts", sts), |
|
("html5", "1"), |
|
("c", "TVHTML5"), |
|
("cver", "7.20201028"), |
|
] |
|
) |
|
return _video_info_url(params) |
|
|
|
|
|
def _video_info_url(params: OrderedDict) -> str: |
|
return "https://www.youtube.com/get_video_info?" + urlencode(params) |
|
|
|
|
|
def js_url(html: str) -> str: |
|
"""Get the base JavaScript url. |
|
|
|
Construct the base JavaScript url, which contains the decipher |
|
"transforms". |
|
|
|
:param str html: |
|
The html contents of the watch page. |
|
""" |
|
try: |
|
base_js = get_ytplayer_config(html)['assets']['js'] |
|
except (KeyError, RegexMatchError): |
|
base_js = get_ytplayer_js(html) |
|
return "https://youtube.com" + base_js |
|
|
|
|
|
def mime_type_codec(mime_type_codec: str) -> Tuple[str, List[str]]: |
|
"""Parse the type data. |
|
|
|
Breaks up the data in the ``type`` key of the manifest, which contains the |
|
mime type and codecs serialized together, and splits them into separate |
|
elements. |
|
|
|
**Example**: |
|
|
|
mime_type_codec('audio/webm; codecs="opus"') -> ('audio/webm', ['opus']) |
|
|
|
:param str mime_type_codec: |
|
String containing mime type and codecs. |
|
:rtype: tuple |
|
:returns: |
|
The mime type and a list of codecs. |
|
|
|
""" |
|
pattern = r"(\w+\/\w+)\;\scodecs=\"([a-zA-Z-0-9.,\s]*)\"" |
|
regex = re.compile(pattern) |
|
results = regex.search(mime_type_codec) |
|
if not results: |
|
raise RegexMatchError(caller="mime_type_codec", pattern=pattern) |
|
mime_type, codecs = results.groups() |
|
return mime_type, [c.strip() for c in codecs.split(",")] |
|
|
|
|
|
def get_ytplayer_js(html: str) -> Any: |
|
"""Get the YouTube player base JavaScript path. |
|
|
|
:param str html |
|
The html contents of the watch page. |
|
:rtype: str |
|
:returns: |
|
Path to YouTube's base.js file. |
|
""" |
|
js_url_patterns = [ |
|
r"(/s/player/[\w\d]+/[\w\d_/.]+/base\.js)" |
|
] |
|
for pattern in js_url_patterns: |
|
regex = re.compile(pattern) |
|
function_match = regex.search(html) |
|
if function_match: |
|
logger.debug("finished regex search, matched: %s", pattern) |
|
yt_player_js = function_match.group(1) |
|
return yt_player_js |
|
|
|
raise RegexMatchError( |
|
caller="get_ytplayer_js", pattern="js_url_patterns" |
|
) |
|
|
|
|
|
def get_ytplayer_config(html: str) -> Any: |
|
"""Get the YouTube player configuration data from the watch html. |
|
|
|
Extract the ``ytplayer_config``, which is json data embedded within the |
|
watch html and serves as the primary source of obtaining the stream |
|
manifest data. |
|
|
|
:param str html: |
|
The html contents of the watch page. |
|
:rtype: str |
|
:returns: |
|
Substring of the html containing the encoded manifest data. |
|
""" |
|
logger.debug("finding initial function name") |
|
config_patterns = [ |
|
r"ytplayer\.config\s*=\s*", |
|
r"ytInitialPlayerResponse\s*=\s*" |
|
] |
|
for pattern in config_patterns: |
|
|
|
try: |
|
return parse_for_object(html, pattern) |
|
except HTMLParseError as e: |
|
logger.debug(f'Pattern failed: {pattern}') |
|
logger.debug(e) |
|
continue |
|
|
|
|
|
|
|
|
|
|
|
setconfig_patterns = [ |
|
r"yt\.setConfig\(.*['\"]PLAYER_CONFIG['\"]:\s*" |
|
] |
|
for pattern in setconfig_patterns: |
|
|
|
try: |
|
return parse_for_object(html, pattern) |
|
except HTMLParseError: |
|
continue |
|
|
|
raise RegexMatchError( |
|
caller="get_ytplayer_config", pattern="config_patterns, setconfig_patterns" |
|
) |
|
|
|
|
|
def get_ytcfg(html: str) -> str: |
|
"""Get the entirety of the ytcfg object. |
|
|
|
This is built over multiple pieces, so we have to find all matches and |
|
combine the dicts together. |
|
|
|
:param str html: |
|
The html contents of the watch page. |
|
:rtype: str |
|
:returns: |
|
Substring of the html containing the encoded manifest data. |
|
""" |
|
ytcfg = {} |
|
ytcfg_patterns = [ |
|
r"ytcfg\s=\s", |
|
r"ytcfg\.set\(" |
|
] |
|
for pattern in ytcfg_patterns: |
|
|
|
try: |
|
found_objects = parse_for_all_objects(html, pattern) |
|
for obj in found_objects: |
|
ytcfg.update(obj) |
|
except HTMLParseError: |
|
continue |
|
|
|
if len(ytcfg) > 0: |
|
return ytcfg |
|
|
|
raise RegexMatchError( |
|
caller="get_ytcfg", pattern="ytcfg_pattenrs" |
|
) |
|
|
|
|
|
def apply_signature(stream_manifest: Dict, vid_info: Dict, js: str) -> None: |
|
"""Apply the decrypted signature to the stream manifest. |
|
|
|
:param dict stream_manifest: |
|
Details of the media streams available. |
|
:param str js: |
|
The contents of the base.js asset file. |
|
|
|
""" |
|
cipher = Cipher(js=js) |
|
|
|
for i, stream in enumerate(stream_manifest): |
|
try: |
|
url: str = stream["url"] |
|
except KeyError: |
|
live_stream = ( |
|
vid_info.get("playabilityStatus", {},) |
|
.get("liveStreamability") |
|
) |
|
if live_stream: |
|
raise LiveStreamError("UNKNOWN") |
|
|
|
if "signature" in url or ( |
|
"s" not in stream and ("&sig=" in url or "&lsig=" in url) |
|
): |
|
|
|
|
|
|
|
logger.debug("signature found, skip decipher") |
|
continue |
|
|
|
signature = cipher.get_signature(ciphered_signature=stream["s"]) |
|
|
|
logger.debug( |
|
"finished descrambling signature for itag=%s", stream["itag"] |
|
) |
|
parsed_url = urlparse(url) |
|
|
|
|
|
query_params = parse_qs(urlparse(url).query) |
|
query_params = { |
|
k: v[0] for k,v in query_params.items() |
|
} |
|
query_params['sig'] = signature |
|
if 'ratebypass' not in query_params.keys(): |
|
|
|
|
|
initial_n = list(query_params['n']) |
|
new_n = cipher.calculate_n(initial_n) |
|
query_params['n'] = new_n |
|
|
|
url = f'{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}?{urlencode(query_params)}' |
|
|
|
|
|
stream_manifest[i]["url"] = url |
|
|
|
|
|
def apply_descrambler(stream_data: Dict) -> None: |
|
"""Apply various in-place transforms to YouTube's media stream data. |
|
|
|
Creates a ``list`` of dictionaries by string splitting on commas, then |
|
taking each list item, parsing it as a query string, converting it to a |
|
``dict`` and unquoting the value. |
|
|
|
:param dict stream_data: |
|
Dictionary containing query string encoded values. |
|
|
|
**Example**: |
|
|
|
>>> d = {'foo': 'bar=1&var=test,em=5&t=url%20encoded'} |
|
>>> apply_descrambler(d, 'foo') |
|
>>> print(d) |
|
{'foo': [{'bar': '1', 'var': 'test'}, {'em': '5', 't': 'url encoded'}]} |
|
|
|
""" |
|
if 'url' in stream_data: |
|
return None |
|
|
|
|
|
formats = [] |
|
if 'formats' in stream_data.keys(): |
|
formats.extend(stream_data['formats']) |
|
if 'adaptiveFormats' in stream_data.keys(): |
|
formats.extend(stream_data['adaptiveFormats']) |
|
|
|
|
|
for data in formats: |
|
if 'url' not in data: |
|
if 'signatureCipher' in data: |
|
cipher_url = parse_qs(data['signatureCipher']) |
|
data['url'] = cipher_url['url'][0] |
|
data['s'] = cipher_url['s'][0] |
|
data['is_otf'] = data.get('type') == 'FORMAT_STREAM_TYPE_OTF' |
|
|
|
logger.debug("applying descrambler") |
|
return formats |
|
|
|
|
|
def initial_data(watch_html: str) -> str: |
|
"""Extract the ytInitialData json from the watch_html page. |
|
|
|
This mostly contains metadata necessary for rendering the page on-load, |
|
such as video information, copyright notices, etc. |
|
|
|
@param watch_html: Html of the watch page |
|
@return: |
|
""" |
|
patterns = [ |
|
r"window\[['\"]ytInitialData['\"]]\s*=\s*", |
|
r"ytInitialData\s*=\s*" |
|
] |
|
for pattern in patterns: |
|
try: |
|
return parse_for_object(watch_html, pattern) |
|
except HTMLParseError: |
|
pass |
|
|
|
raise RegexMatchError(caller='initial_data', pattern='initial_data_pattern') |
|
|
|
|
|
def initial_player_response(watch_html: str) -> str: |
|
"""Extract the ytInitialPlayerResponse json from the watch_html page. |
|
|
|
This mostly contains metadata necessary for rendering the page on-load, |
|
such as video information, copyright notices, etc. |
|
|
|
@param watch_html: Html of the watch page |
|
@return: |
|
""" |
|
patterns = [ |
|
r"window\[['\"]ytInitialPlayerResponse['\"]]\s*=\s*", |
|
r"ytInitialPlayerResponse\s*=\s*" |
|
] |
|
for pattern in patterns: |
|
try: |
|
return parse_for_object(watch_html, pattern) |
|
except HTMLParseError: |
|
pass |
|
|
|
raise RegexMatchError( |
|
caller='initial_player_response', |
|
pattern='initial_player_response_pattern' |
|
) |
|
|
|
|
|
def metadata(initial_data) -> Optional[YouTubeMetadata]: |
|
"""Get the informational metadata for the video. |
|
|
|
e.g.: |
|
[ |
|
{ |
|
'Song': '๊ฐ๋จ์คํ์ผ(Gangnam Style)', |
|
'Artist': 'PSY', |
|
'Album': 'PSY SIX RULES Pt.1', |
|
'Licensed to YouTube by': 'YG Entertainment Inc. [...]' |
|
} |
|
] |
|
|
|
:rtype: YouTubeMetadata |
|
""" |
|
try: |
|
metadata_rows: List = initial_data["contents"]["twoColumnWatchNextResults"][ |
|
"results"]["results"]["contents"][1]["videoSecondaryInfoRenderer"][ |
|
"metadataRowContainer"]["metadataRowContainerRenderer"]["rows"] |
|
except (KeyError, IndexError): |
|
|
|
return YouTubeMetadata([]) |
|
|
|
|
|
|
|
metadata_rows = filter( |
|
lambda x: "metadataRowRenderer" in x.keys(), |
|
metadata_rows |
|
) |
|
|
|
|
|
|
|
metadata_rows = [x["metadataRowRenderer"] for x in metadata_rows] |
|
|
|
return YouTubeMetadata(metadata_rows) |
|
|