# -*- coding: utf-8 -*- """Module for interacting with a user's youtube channel.""" import json import logging from typing import Dict, List, Optional, Tuple from pytube import extract, Playlist, request from pytube.helpers import uniqueify logger = logging.getLogger(__name__) class Channel(Playlist): def __init__(self, url: str, proxies: Optional[Dict[str, str]] = None): """Construct a :class:`Channel `. :param str url: A valid YouTube channel URL. :param proxies: (Optional) A dictionary of proxies to use for web requests. """ super().__init__(url, proxies) self.channel_uri = extract.channel_name(url) self.channel_url = ( f"https://www.youtube.com{self.channel_uri}" ) self.videos_url = self.channel_url + '/videos' self.playlists_url = self.channel_url + '/playlists' self.community_url = self.channel_url + '/community' self.featured_channels_url = self.channel_url + '/channels' self.about_url = self.channel_url + '/about' # Possible future additions self._playlists_html = None self._community_html = None self._featured_channels_html = None self._about_html = None @property def channel_name(self): """Get the name of the YouTube channel. :rtype: str """ return self.initial_data['metadata']['channelMetadataRenderer']['title'] @property def channel_id(self): """Get the ID of the YouTube channel. This will return the underlying ID, not the vanity URL. :rtype: str """ return self.initial_data['metadata']['channelMetadataRenderer']['externalId'] @property def vanity_url(self): """Get the vanity URL of the YouTube channel. Returns None if it doesn't exist. :rtype: str """ return self.initial_data['metadata']['channelMetadataRenderer'].get('vanityChannelUrl', None) # noqa:E501 @property def html(self): """Get the html for the /videos page. :rtype: str """ if self._html: return self._html self._html = request.get(self.videos_url) return self._html @property def playlists_html(self): """Get the html for the /playlists page. Currently unused for any functionality. :rtype: str """ if self._playlists_html: return self._playlists_html else: self._playlists_html = request.get(self.playlists_url) return self._playlists_html @property def community_html(self): """Get the html for the /community page. Currently unused for any functionality. :rtype: str """ if self._community_html: return self._community_html else: self._community_html = request.get(self.community_url) return self._community_html @property def featured_channels_html(self): """Get the html for the /channels page. Currently unused for any functionality. :rtype: str """ if self._featured_channels_html: return self._featured_channels_html else: self._featured_channels_html = request.get(self.featured_channels_url) return self._featured_channels_html @property def about_html(self): """Get the html for the /about page. Currently unused for any functionality. :rtype: str """ if self._about_html: return self._about_html else: self._about_html = request.get(self.about_url) return self._about_html @staticmethod def _extract_videos(raw_json: str) -> Tuple[List[str], Optional[str]]: """Extracts videos from a raw json page :param str raw_json: Input json extracted from the page or the last server response :rtype: Tuple[List[str], Optional[str]] :returns: Tuple containing a list of up to 100 video watch ids and a continuation token, if more videos are available """ initial_data = json.loads(raw_json) # this is the json tree structure, if the json was extracted from # html try: videos = initial_data["contents"][ "twoColumnBrowseResultsRenderer"][ "tabs"][1]["tabRenderer"]["content"][ "sectionListRenderer"]["contents"][0][ "itemSectionRenderer"]["contents"][0][ "gridRenderer"]["items"] except (KeyError, IndexError, TypeError): try: # this is the json tree structure, if the json was directly sent # by the server in a continuation response important_content = initial_data[1]['response']['onResponseReceivedActions'][ 0 ]['appendContinuationItemsAction']['continuationItems'] videos = important_content except (KeyError, IndexError, TypeError): try: # this is the json tree structure, if the json was directly sent # by the server in a continuation response # no longer a list and no longer has the "response" key important_content = initial_data['onResponseReceivedActions'][0][ 'appendContinuationItemsAction']['continuationItems'] videos = important_content except (KeyError, IndexError, TypeError) as p: logger.info(p) return [], None try: continuation = videos[-1]['continuationItemRenderer'][ 'continuationEndpoint' ]['continuationCommand']['token'] videos = videos[:-1] except (KeyError, IndexError): # if there is an error, no continuation is available continuation = None # remove duplicates return ( uniqueify( list( # only extract the video ids from the video data map( lambda x: ( f"/watch?v=" f"{x['gridVideoRenderer']['videoId']}" ), videos ) ), ), continuation, )