import gradio as gr from transformers import pipeline from wordcloud import WordCloud, STOPWORDS from youtubesearchpython import * import pandas as pd import numpy as np import matplotlib.pyplot as plt from PIL import Image import re import io from io import BytesIO import time sentiment_task = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment-latest", tokenizer="cardiffnlp/twitter-roberta-base-sentiment-latest") def extract_youtube_video_id(url_or_id): """ Extracts the YouTube video ID from a given URL or returns the ID if a direct ID is provided. Args: url_or_id (str): A YouTube URL or a video ID. Returns: str: The extracted YouTube video ID. """ # Check if it's already a valid YouTube ID (typically 11 characters) if len(url_or_id) == 11 and not re.search(r'[^0-9A-Za-z_-]', url_or_id): return url_or_id # Regular expressions for various YouTube URL formats regex_patterns = [ r'(?:https?://)?www\.youtube\.com/watch\?v=([0-9A-Za-z_-]{11})', r'(?:https?://)?youtu\.be/([0-9A-Za-z_-]{11})', r'(?:https?://)?www\.youtube\.com/embed/([0-9A-Za-z_-]{11})', r'(?:https?://)?www\.youtube\.com/v/([0-9A-Za-z_-]{11})', r'(?:https?://)?www\.youtube\.com/shorts/([0-9A-Za-z_-]{11})' ] # Try each regex pattern to find a match for pattern in regex_patterns: match = re.search(pattern, url_or_id) if match: return match.group(1) # If no pattern matches, return an error or a specific message return "Invalid YouTube URL or ID" def comments_collector(video_link, max_comments = 50): # This function collects comments from a given YouTube video link. # It uses the youtubesearchpython library to extract comments and pandas for data manipulation. # Args: # video_link (str): The YouTube video link from which to collect comments. # max_comments (int, optional): The maximum number of comments to retrieve. Defaults to 50. # Returns: # pandas.DataFrame: A DataFrame containing the comments, or None in case of an exception. video_id = extract_youtube_video_id(video_link) max_comments -= 1 try: #load the first 20 comments comments = Comments(video_id) print(f'Comments Retrieved and Loading...') #load more comments, 20 at a time, until the limit is reached while comments.hasMoreComments and (len(comments.comments["result"]) <= max_comments): comments.getNextComments() print(f'Found all the {len(comments.comments["result"])} comments.') #load all the comments into "comments" variable comments = comments.comments #define data list for collecting comments for a particular video data = [] #loop through all the comments for i in range(len(comments['result'])): ############################################################################# is_author = comments['result'][i]['authorIsChannelOwner'] #check if the comment is from the video author or not -> neglect if so. if is_author: pass ############################################################################# #comment comes from others, we will save this comment. else: comment_dict = {} comment_id = comments['result'][i]['id'] author = comments['result'][i]['author']['name'] content = comments['result'][i]['content'] ############################################################################# #cleaning comments likes (e.g., convert 10K likes to 10000, convert None like to 0) if comments['result'][i]['votes']['label'] is None: likes = 0 else: likes = comments['result'][i]['votes']['label'].split(' ')[0] if 'K' in likes: likes = int(float(likes.replace('K', '')) * 1000) ############################################################################# #cleaning comments reply count replyCount = comments['result'][i]['replyCount'] #if there is no reply, we will log it as 0 if replyCount is None: comment_dict['replyCount'] = 0 #otherwise, we will log as integer else: comment_dict['replyCount'] = int(replyCount) ############################################################################# comment_dict['comment_id'] = comment_id comment_dict['author'] = author comment_dict['content'] = content comment_dict['likes'] = likes data.append(comment_dict) ############################################################################# print(f'Excluding author comments, we ended up with {len(data)} comments') return pd.DataFrame(data) except Exception as e: print(e) return None def comments_analyzer(comments_df): # This function analyzes the sentiment of comments in a given DataFrame. # It requires a DataFrame of comments, typically generated by the comments_collector function. # Args: # comments_df (pandas.DataFrame): A DataFrame containing YouTube comments. # Returns: # dict: A dictionary with analysis results, including sentiment counts and percentages, or None if input is None. # The function applies a sentiment analysis task on each comment and categorizes them as positive, neutral, or negative. # It also calculates the percentage of positive comments and blends all comments into a single string. if comments_df is None: return None else: start_time = time.time() # Example of batch processing with sentiment and confidence batch_size = 20 # Adjust the size based on your system's capabilities sentiments = [] scores = [] for i in range(0, len(comments_df), batch_size): batch = comments_df['content'][i:i+batch_size].tolist() batch_results = sentiment_task(batch) # Extracting both sentiment labels and scores batch_sentiments = [item['label'] for item in batch_results] batch_scores = [item['score'] for item in batch_results] sentiments.extend(batch_sentiments) scores.extend(batch_scores) comments_df['sentiment'] = sentiments comments_df['score'] = scores end_time = time.time() print(f"Time taken for batch sentiment analysis: {end_time - start_time} seconds") def get_top_comments(comments, sentiment_type, top_n=5): filtered_comments = comments[comments['sentiment'] == sentiment_type] top_comments = filtered_comments.nlargest(top_n, 'score') if not top_comments.empty: return '\n\n'.join(f"{row['content']} - {row['author']}" for _, row in top_comments.iterrows()) else: return f"No {sentiment_type} comments available." start_time = time.time() # Get top positive comments top_positive_comments = get_top_comments(comments_df, 'positive') # Get top negative comments top_negative_comments = get_top_comments(comments_df, 'negative') end_time = time.time() print(f"Time taken for finding top n positive/negative comments: {end_time - start_time} seconds") data = {} #Categorize the comments by sentiment and count them data['total_comments'] = len(comments_df) data['num_positive'] = comments_df['sentiment'].value_counts().get('positive', 0) data['num_neutral'] = comments_df['sentiment'].value_counts().get('neutral', 0) data['num_negative'] = comments_df['sentiment'].value_counts().get('negative', 0) #blend all the comments data['blended_comments'] = comments_df['content'].str.cat(sep=' ') data['pct_positive'] = 100 * round(data['num_positive']/data['total_comments'], 2) return data, top_positive_comments, top_negative_comments def generate_wordcloud(long_text, additional_stopwords=['Timestamps', 'timestamps']): # This function generates a word cloud image from a given text and returns it as a PIL image object. # Args: # long_text (str): The text from which to generate the word cloud. # additional_stopwords (list, optional): A list of words to be excluded from the word cloud. # The function creates a word cloud with specified font size, word limit, and background color. # It then converts the matplotlib plot to a PIL Image object for further use or saving. # Returns: # PIL.Image: The generated word cloud as a PIL image object. #Call the default STOPWORDS from wordcloud library stopwords = set(STOPWORDS) #Combine the default STOPWORDS with the manually specified STOPWORDS to exclude them from the wordcloud. all_stopwords = stopwords.union(additional_stopwords) # Create a Word Cloud wordcloud = WordCloud(max_font_size=50, max_words=20, background_color="black", stopwords=all_stopwords, colormap='plasma').generate(long_text) # Create a figure plt.figure(figsize=(10,10), facecolor=None) plt.imshow(wordcloud, interpolation="bilinear") plt.axis("off") plt.tight_layout(pad=0) # Save to a BytesIO object img_buf = io.BytesIO() plt.savefig(img_buf, format='png', bbox_inches='tight', pad_inches=0) img_buf.seek(0) # Close the plt figure to prevent display plt.close() # Use PIL to open the image from the BytesIO object image = Image.open(img_buf) return image def create_sentiment_analysis_chart(data): # This function creates a bar chart for sentiment analysis results and returns it as a PIL image object. # Args: # data (dict): A dictionary containing the count of positive, negative, and neutral comments. # The function first converts the input data into a pandas DataFrame. # It then creates a bar chart using matplotlib, setting specific colors for different sentiment types. # Titles, labels, and legends are added for clarity. # Finally, the plot is saved to a BytesIO object and converted to a PIL image. # Returns: # PIL.Image: The sentiment analysis bar chart as a PIL image object. # Convert the data to a DataFrame df = {} df['num_positive'] = data['num_positive'] df['num_negative'] = data['num_negative'] df['num_neutral'] = data['num_neutral'] df = pd.DataFrame(df, index=[0]) # Plotting plt.figure(figsize=(8, 6)) bar_colors = ['green', 'red', 'blue'] # Colors for positive, negative, neutral df.plot(kind='bar', color=bar_colors, legend=True) # Adding titles and labels plt.title('Sentiment Analysis Results') plt.xlabel('Sentiment Types') plt.ylabel('Number of Comments') plt.xticks(ticks=[0], labels=['Sentiments'], rotation=0) # Adjust x-ticks plt.legend(['Positive', 'Negative', 'Neutral']) # Save the plot to a BytesIO object buf = BytesIO() plt.savefig(buf, format='png') buf.seek(0) # Close the plt figure to prevent display plt.close() # Use PIL to open the image from the BytesIO object image = Image.open(buf) return image ############################################################################################################################################ # The code for processing the YouTube link, generating the word cloud, summary, and sentiment analysis # should be defined here (using your existing functions). def process_youtube_comments(youtube_link, max_comments, stop_words): # Process the YouTube link and generate the word cloud, summary, and sentiment analysis start_time = time.time() # Pull comments from the YouTube Video comments_df = comments_collector(video_link=youtube_link, max_comments=max_comments) end_time = time.time() print(f"Time taken for loading comments: {end_time - start_time} seconds") # Analyze analysis_dict, top_positive_comments, top_negative_comments = comments_analyzer(comments_df) long_text = analysis_dict['blended_comments'] start_time = time.time() # Generate word cloud word_cloud_img = generate_wordcloud(long_text, additional_stopwords=['Timestamps', 'timestamps']) end_time = time.time() print(f"Time taken for generating word clouds: {end_time - start_time} seconds") start_time = time.time() # Create Sentiment Chart sentiment_chart = create_sentiment_analysis_chart(analysis_dict) end_time = time.time() print(f"Time taken for creating sentiment chart: {end_time - start_time} seconds") # Return the generated word cloud image, summary text, and sentiment analysis chart return word_cloud_img, top_positive_comments, top_negative_comments, sentiment_chart ############################################################################################################################################ # Gradio interface interface = gr.Interface( fn=process_youtube_comments, inputs=[ gr.Textbox(label="YouTube Video Link"), gr.Number(label="Maximum Comments", value=50), gr.Textbox(label="Words to exclude from cloud (comma-separated)") ], outputs=[ gr.Image(label="Word Cloud ☁️"), gr.Textbox(label="Top 5 Positive Comments 👍🏻"), gr.Textbox(label="Top 5 Negative Comments 👎🏻"), gr.Image(label="Sentiment Analysis Chart 📊") ], title="YouTube Comments Analyzer 📈", description="Enter a YouTube link to generate a word cloud, top positive and negative comments, and sentiment analysis of the comments. \n\n Note: The app is both desktop 🖥️/mobile 📱 compatible. Depending on the amount of comments found, it can take up to 1 - 2 mins to process. Have fun 😀!" ) # Run the interface interface.launch() ############################################################################################################################################