Krittaprot's picture
Revised maximum default no. of comments to 50
6bc36dc
import gradio as gr
from transformers import pipeline
from wordcloud import WordCloud, STOPWORDS
from youtubesearchpython import *
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import re
import io
from io import BytesIO
import time
sentiment_task = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment-latest", tokenizer="cardiffnlp/twitter-roberta-base-sentiment-latest")
def extract_youtube_video_id(url_or_id):
"""
Extracts the YouTube video ID from a given URL or returns the ID if a direct ID is provided.
Args:
url_or_id (str): A YouTube URL or a video ID.
Returns:
str: The extracted YouTube video ID.
"""
# Check if it's already a valid YouTube ID (typically 11 characters)
if len(url_or_id) == 11 and not re.search(r'[^0-9A-Za-z_-]', url_or_id):
return url_or_id
# Regular expressions for various YouTube URL formats
regex_patterns = [
r'(?:https?://)?www\.youtube\.com/watch\?v=([0-9A-Za-z_-]{11})',
r'(?:https?://)?youtu\.be/([0-9A-Za-z_-]{11})',
r'(?:https?://)?www\.youtube\.com/embed/([0-9A-Za-z_-]{11})',
r'(?:https?://)?www\.youtube\.com/v/([0-9A-Za-z_-]{11})',
r'(?:https?://)?www\.youtube\.com/shorts/([0-9A-Za-z_-]{11})'
]
# Try each regex pattern to find a match
for pattern in regex_patterns:
match = re.search(pattern, url_or_id)
if match:
return match.group(1)
# If no pattern matches, return an error or a specific message
return "Invalid YouTube URL or ID"
def comments_collector(video_link, max_comments = 50):
# This function collects comments from a given YouTube video link.
# It uses the youtubesearchpython library to extract comments and pandas for data manipulation.
# Args:
# video_link (str): The YouTube video link from which to collect comments.
# max_comments (int, optional): The maximum number of comments to retrieve. Defaults to 50.
# Returns:
# pandas.DataFrame: A DataFrame containing the comments, or None in case of an exception.
video_id = extract_youtube_video_id(video_link)
max_comments -= 1
try:
#load the first 20 comments
comments = Comments(video_id)
print(f'Comments Retrieved and Loading...')
#load more comments, 20 at a time, until the limit is reached
while comments.hasMoreComments and (len(comments.comments["result"]) <= max_comments):
comments.getNextComments()
print(f'Found all the {len(comments.comments["result"])} comments.')
#load all the comments into "comments" variable
comments = comments.comments
#define data list for collecting comments for a particular video
data = []
#loop through all the comments
for i in range(len(comments['result'])):
#############################################################################
is_author = comments['result'][i]['authorIsChannelOwner']
#check if the comment is from the video author or not -> neglect if so.
if is_author:
pass
#############################################################################
#comment comes from others, we will save this comment.
else:
comment_dict = {}
comment_id = comments['result'][i]['id']
author = comments['result'][i]['author']['name']
content = comments['result'][i]['content']
#############################################################################
#cleaning comments likes (e.g., convert 10K likes to 10000, convert None like to 0)
if comments['result'][i]['votes']['label'] is None:
likes = 0
else:
likes = comments['result'][i]['votes']['label'].split(' ')[0]
if 'K' in likes:
likes = int(float(likes.replace('K', '')) * 1000)
#############################################################################
#cleaning comments reply count
replyCount = comments['result'][i]['replyCount']
#if there is no reply, we will log it as 0
if replyCount is None:
comment_dict['replyCount'] = 0
#otherwise, we will log as integer
else:
comment_dict['replyCount'] = int(replyCount)
#############################################################################
comment_dict['comment_id'] = comment_id
comment_dict['author'] = author
comment_dict['content'] = content
comment_dict['likes'] = likes
data.append(comment_dict)
#############################################################################
print(f'Excluding author comments, we ended up with {len(data)} comments')
return pd.DataFrame(data)
except Exception as e:
print(e)
return None
def comments_analyzer(comments_df):
# This function analyzes the sentiment of comments in a given DataFrame.
# It requires a DataFrame of comments, typically generated by the comments_collector function.
# Args:
# comments_df (pandas.DataFrame): A DataFrame containing YouTube comments.
# Returns:
# dict: A dictionary with analysis results, including sentiment counts and percentages, or None if input is None.
# The function applies a sentiment analysis task on each comment and categorizes them as positive, neutral, or negative.
# It also calculates the percentage of positive comments and blends all comments into a single string.
if comments_df is None:
return None
else:
start_time = time.time()
# Example of batch processing with sentiment and confidence
batch_size = 20 # Adjust the size based on your system's capabilities
sentiments = []
scores = []
for i in range(0, len(comments_df), batch_size):
batch = comments_df['content'][i:i+batch_size].tolist()
batch_results = sentiment_task(batch)
# Extracting both sentiment labels and scores
batch_sentiments = [item['label'] for item in batch_results]
batch_scores = [item['score'] for item in batch_results]
sentiments.extend(batch_sentiments)
scores.extend(batch_scores)
comments_df['sentiment'] = sentiments
comments_df['score'] = scores
end_time = time.time()
print(f"Time taken for batch sentiment analysis: {end_time - start_time} seconds")
def get_top_comments(comments, sentiment_type, top_n=5):
filtered_comments = comments[comments['sentiment'] == sentiment_type]
top_comments = filtered_comments.nlargest(top_n, 'score')
if not top_comments.empty:
return '\n\n'.join(f"{row['content']} - {row['author']}" for _, row in top_comments.iterrows())
else:
return f"No {sentiment_type} comments available."
start_time = time.time()
# Get top positive comments
top_positive_comments = get_top_comments(comments_df, 'positive')
# Get top negative comments
top_negative_comments = get_top_comments(comments_df, 'negative')
end_time = time.time()
print(f"Time taken for finding top n positive/negative comments: {end_time - start_time} seconds")
data = {}
#Categorize the comments by sentiment and count them
data['total_comments'] = len(comments_df)
data['num_positive'] = comments_df['sentiment'].value_counts().get('positive', 0)
data['num_neutral'] = comments_df['sentiment'].value_counts().get('neutral', 0)
data['num_negative'] = comments_df['sentiment'].value_counts().get('negative', 0)
#blend all the comments
data['blended_comments'] = comments_df['content'].str.cat(sep=' ')
data['pct_positive'] = 100 * round(data['num_positive']/data['total_comments'], 2)
return data, top_positive_comments, top_negative_comments
def generate_wordcloud(long_text, additional_stopwords=['Timestamps', 'timestamps']):
# This function generates a word cloud image from a given text and returns it as a PIL image object.
# Args:
# long_text (str): The text from which to generate the word cloud.
# additional_stopwords (list, optional): A list of words to be excluded from the word cloud.
# The function creates a word cloud with specified font size, word limit, and background color.
# It then converts the matplotlib plot to a PIL Image object for further use or saving.
# Returns:
# PIL.Image: The generated word cloud as a PIL image object.
#Call the default STOPWORDS from wordcloud library
stopwords = set(STOPWORDS)
#Combine the default STOPWORDS with the manually specified STOPWORDS to exclude them from the wordcloud.
all_stopwords = stopwords.union(additional_stopwords)
# Create a Word Cloud
wordcloud = WordCloud(max_font_size=50, max_words=20, background_color="black", stopwords=all_stopwords, colormap='plasma').generate(long_text)
# Create a figure
plt.figure(figsize=(10,10), facecolor=None)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.tight_layout(pad=0)
# Save to a BytesIO object
img_buf = io.BytesIO()
plt.savefig(img_buf, format='png', bbox_inches='tight', pad_inches=0)
img_buf.seek(0)
# Close the plt figure to prevent display
plt.close()
# Use PIL to open the image from the BytesIO object
image = Image.open(img_buf)
return image
def create_sentiment_analysis_chart(data):
# This function creates a bar chart for sentiment analysis results and returns it as a PIL image object.
# Args:
# data (dict): A dictionary containing the count of positive, negative, and neutral comments.
# The function first converts the input data into a pandas DataFrame.
# It then creates a bar chart using matplotlib, setting specific colors for different sentiment types.
# Titles, labels, and legends are added for clarity.
# Finally, the plot is saved to a BytesIO object and converted to a PIL image.
# Returns:
# PIL.Image: The sentiment analysis bar chart as a PIL image object.
# Convert the data to a DataFrame
df = {}
df['num_positive'] = data['num_positive']
df['num_negative'] = data['num_negative']
df['num_neutral'] = data['num_neutral']
df = pd.DataFrame(df, index=[0])
# Plotting
plt.figure(figsize=(8, 6))
bar_colors = ['green', 'red', 'blue'] # Colors for positive, negative, neutral
df.plot(kind='bar', color=bar_colors, legend=True)
# Adding titles and labels
plt.title('Sentiment Analysis Results')
plt.xlabel('Sentiment Types')
plt.ylabel('Number of Comments')
plt.xticks(ticks=[0], labels=['Sentiments'], rotation=0) # Adjust x-ticks
plt.legend(['Positive', 'Negative', 'Neutral'])
# Save the plot to a BytesIO object
buf = BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
# Close the plt figure to prevent display
plt.close()
# Use PIL to open the image from the BytesIO object
image = Image.open(buf)
return image
############################################################################################################################################
# The code for processing the YouTube link, generating the word cloud, summary, and sentiment analysis
# should be defined here (using your existing functions).
def process_youtube_comments(youtube_link, max_comments, stop_words):
# Process the YouTube link and generate the word cloud, summary, and sentiment analysis
start_time = time.time()
# Pull comments from the YouTube Video
comments_df = comments_collector(video_link=youtube_link, max_comments=max_comments)
end_time = time.time()
print(f"Time taken for loading comments: {end_time - start_time} seconds")
# Analyze
analysis_dict, top_positive_comments, top_negative_comments = comments_analyzer(comments_df)
long_text = analysis_dict['blended_comments']
start_time = time.time()
# Generate word cloud
word_cloud_img = generate_wordcloud(long_text, additional_stopwords=['Timestamps', 'timestamps'])
end_time = time.time()
print(f"Time taken for generating word clouds: {end_time - start_time} seconds")
start_time = time.time()
# Create Sentiment Chart
sentiment_chart = create_sentiment_analysis_chart(analysis_dict)
end_time = time.time()
print(f"Time taken for creating sentiment chart: {end_time - start_time} seconds")
# Return the generated word cloud image, summary text, and sentiment analysis chart
return word_cloud_img, top_positive_comments, top_negative_comments, sentiment_chart
############################################################################################################################################
# Gradio interface
interface = gr.Interface(
fn=process_youtube_comments,
inputs=[
gr.Textbox(label="YouTube Video Link"),
gr.Number(label="Maximum Comments", value=50),
gr.Textbox(label="Words to exclude from cloud (comma-separated)")
],
outputs=[
gr.Image(label="Word Cloud ☁️"),
gr.Textbox(label="Top 5 Positive Comments πŸ‘πŸ»"),
gr.Textbox(label="Top 5 Negative Comments πŸ‘ŽπŸ»"),
gr.Image(label="Sentiment Analysis Chart πŸ“Š")
],
title="YouTube Comments Analyzer πŸ“ˆ",
description="Enter a YouTube link to generate a word cloud, top positive and negative comments, and sentiment analysis of the comments. \n\n Note: The app is both desktop πŸ–₯️/mobile πŸ“± compatible. Depending on the amount of comments found, it can take up to 1 - 2 mins to process. Have fun πŸ˜€!"
)
# Run the interface
interface.launch()
############################################################################################################################################