|
|
|
"""Module for interacting with a user's youtube channel.""" |
|
import json |
|
import logging |
|
from typing import Dict, List, Optional, Tuple |
|
|
|
from pytube import extract, Playlist, request |
|
from pytube.helpers import uniqueify |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class Channel(Playlist): |
|
def __init__(self, url: str, proxies: Optional[Dict[str, str]] = None): |
|
"""Construct a :class:`Channel <Channel>`. |
|
|
|
:param str url: |
|
A valid YouTube channel URL. |
|
:param proxies: |
|
(Optional) A dictionary of proxies to use for web requests. |
|
""" |
|
super().__init__(url, proxies) |
|
|
|
self.channel_uri = extract.channel_name(url) |
|
|
|
self.channel_url = ( |
|
f"https://www.youtube.com{self.channel_uri}" |
|
) |
|
|
|
self.videos_url = self.channel_url + '/videos' |
|
self.playlists_url = self.channel_url + '/playlists' |
|
self.community_url = self.channel_url + '/community' |
|
self.featured_channels_url = self.channel_url + '/channels' |
|
self.about_url = self.channel_url + '/about' |
|
|
|
|
|
self._playlists_html = None |
|
self._community_html = None |
|
self._featured_channels_html = None |
|
self._about_html = None |
|
|
|
@property |
|
def channel_name(self): |
|
"""Get the name of the YouTube channel. |
|
|
|
:rtype: str |
|
""" |
|
return self.initial_data['metadata']['channelMetadataRenderer']['title'] |
|
|
|
@property |
|
def channel_id(self): |
|
"""Get the ID of the YouTube channel. |
|
|
|
This will return the underlying ID, not the vanity URL. |
|
|
|
:rtype: str |
|
""" |
|
return self.initial_data['metadata']['channelMetadataRenderer']['externalId'] |
|
|
|
@property |
|
def vanity_url(self): |
|
"""Get the vanity URL of the YouTube channel. |
|
|
|
Returns None if it doesn't exist. |
|
|
|
:rtype: str |
|
""" |
|
return self.initial_data['metadata']['channelMetadataRenderer'].get('vanityChannelUrl', None) |
|
|
|
@property |
|
def html(self): |
|
"""Get the html for the /videos page. |
|
|
|
:rtype: str |
|
""" |
|
if self._html: |
|
return self._html |
|
self._html = request.get(self.videos_url) |
|
return self._html |
|
|
|
@property |
|
def playlists_html(self): |
|
"""Get the html for the /playlists page. |
|
|
|
Currently unused for any functionality. |
|
|
|
:rtype: str |
|
""" |
|
if self._playlists_html: |
|
return self._playlists_html |
|
else: |
|
self._playlists_html = request.get(self.playlists_url) |
|
return self._playlists_html |
|
|
|
@property |
|
def community_html(self): |
|
"""Get the html for the /community page. |
|
|
|
Currently unused for any functionality. |
|
|
|
:rtype: str |
|
""" |
|
if self._community_html: |
|
return self._community_html |
|
else: |
|
self._community_html = request.get(self.community_url) |
|
return self._community_html |
|
|
|
@property |
|
def featured_channels_html(self): |
|
"""Get the html for the /channels page. |
|
|
|
Currently unused for any functionality. |
|
|
|
:rtype: str |
|
""" |
|
if self._featured_channels_html: |
|
return self._featured_channels_html |
|
else: |
|
self._featured_channels_html = request.get(self.featured_channels_url) |
|
return self._featured_channels_html |
|
|
|
@property |
|
def about_html(self): |
|
"""Get the html for the /about page. |
|
|
|
Currently unused for any functionality. |
|
|
|
:rtype: str |
|
""" |
|
if self._about_html: |
|
return self._about_html |
|
else: |
|
self._about_html = request.get(self.about_url) |
|
return self._about_html |
|
|
|
@staticmethod |
|
def _extract_videos(raw_json: str) -> Tuple[List[str], Optional[str]]: |
|
"""Extracts videos from a raw json page |
|
|
|
:param str raw_json: Input json extracted from the page or the last |
|
server response |
|
:rtype: Tuple[List[str], Optional[str]] |
|
:returns: Tuple containing a list of up to 100 video watch ids and |
|
a continuation token, if more videos are available |
|
""" |
|
initial_data = json.loads(raw_json) |
|
|
|
|
|
try: |
|
videos = initial_data["contents"][ |
|
"twoColumnBrowseResultsRenderer"][ |
|
"tabs"][1]["tabRenderer"]["content"][ |
|
"sectionListRenderer"]["contents"][0][ |
|
"itemSectionRenderer"]["contents"][0][ |
|
"gridRenderer"]["items"] |
|
except (KeyError, IndexError, TypeError): |
|
try: |
|
|
|
|
|
important_content = initial_data[1]['response']['onResponseReceivedActions'][ |
|
0 |
|
]['appendContinuationItemsAction']['continuationItems'] |
|
videos = important_content |
|
except (KeyError, IndexError, TypeError): |
|
try: |
|
|
|
|
|
|
|
important_content = initial_data['onResponseReceivedActions'][0][ |
|
'appendContinuationItemsAction']['continuationItems'] |
|
videos = important_content |
|
except (KeyError, IndexError, TypeError) as p: |
|
logger.info(p) |
|
return [], None |
|
|
|
try: |
|
continuation = videos[-1]['continuationItemRenderer'][ |
|
'continuationEndpoint' |
|
]['continuationCommand']['token'] |
|
videos = videos[:-1] |
|
except (KeyError, IndexError): |
|
|
|
continuation = None |
|
|
|
|
|
return ( |
|
uniqueify( |
|
list( |
|
|
|
map( |
|
lambda x: ( |
|
f"/watch?v=" |
|
f"{x['gridVideoRenderer']['videoId']}" |
|
), |
|
videos |
|
) |
|
), |
|
), |
|
continuation, |
|
) |
|
|