|
import gradio as gr |
|
from transformers import pipeline |
|
from wordcloud import WordCloud, STOPWORDS |
|
from youtubesearchpython import * |
|
import pandas as pd |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
from PIL import Image |
|
import re |
|
import io |
|
from io import BytesIO |
|
import time |
|
|
|
|
|
sentiment_task = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment-latest", tokenizer="cardiffnlp/twitter-roberta-base-sentiment-latest") |
|
|
|
def extract_youtube_video_id(url_or_id): |
|
""" |
|
Extracts the YouTube video ID from a given URL or returns the ID if a direct ID is provided. |
|
Args: |
|
url_or_id (str): A YouTube URL or a video ID. |
|
Returns: |
|
str: The extracted YouTube video ID. |
|
""" |
|
|
|
if len(url_or_id) == 11 and not re.search(r'[^0-9A-Za-z_-]', url_or_id): |
|
return url_or_id |
|
|
|
|
|
regex_patterns = [ |
|
r'(?:https?://)?www\.youtube\.com/watch\?v=([0-9A-Za-z_-]{11})', |
|
r'(?:https?://)?youtu\.be/([0-9A-Za-z_-]{11})', |
|
r'(?:https?://)?www\.youtube\.com/embed/([0-9A-Za-z_-]{11})', |
|
r'(?:https?://)?www\.youtube\.com/v/([0-9A-Za-z_-]{11})', |
|
r'(?:https?://)?www\.youtube\.com/shorts/([0-9A-Za-z_-]{11})' |
|
] |
|
|
|
|
|
for pattern in regex_patterns: |
|
match = re.search(pattern, url_or_id) |
|
if match: |
|
return match.group(1) |
|
|
|
|
|
return "Invalid YouTube URL or ID" |
|
|
|
def comments_collector(video_link, max_comments = 100): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
video_id = extract_youtube_video_id(video_link) |
|
max_comments -= 1 |
|
|
|
try: |
|
|
|
comments = Comments(video_id) |
|
print(f'Comments Retrieved and Loading...') |
|
|
|
|
|
while comments.hasMoreComments and (len(comments.comments["result"]) <= max_comments): |
|
comments.getNextComments() |
|
print(f'Found all the {len(comments.comments["result"])} comments.') |
|
|
|
|
|
comments = comments.comments |
|
|
|
|
|
data = [] |
|
|
|
|
|
for i in range(len(comments['result'])): |
|
|
|
is_author = comments['result'][i]['authorIsChannelOwner'] |
|
|
|
|
|
if is_author: |
|
pass |
|
|
|
|
|
else: |
|
comment_dict = {} |
|
comment_id = comments['result'][i]['id'] |
|
author = comments['result'][i]['author']['name'] |
|
content = comments['result'][i]['content'] |
|
|
|
|
|
|
|
if comments['result'][i]['votes']['label'] is None: |
|
likes = 0 |
|
else: |
|
likes = comments['result'][i]['votes']['label'].split(' ')[0] |
|
if 'K' in likes: |
|
likes = int(float(likes.replace('K', '')) * 1000) |
|
|
|
|
|
|
|
replyCount = comments['result'][i]['replyCount'] |
|
|
|
if replyCount is None: |
|
comment_dict['replyCount'] = 0 |
|
|
|
else: |
|
comment_dict['replyCount'] = int(replyCount) |
|
|
|
|
|
comment_dict['comment_id'] = comment_id |
|
comment_dict['author'] = author |
|
comment_dict['content'] = content |
|
comment_dict['likes'] = likes |
|
|
|
data.append(comment_dict) |
|
|
|
print(f'Excluding author comments, we ended up with {len(data)} comments') |
|
return pd.DataFrame(data) |
|
except Exception as e: |
|
print(e) |
|
return None |
|
|
|
def comments_analyzer(comments_df): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if comments_df is None: |
|
return None |
|
else: |
|
|
|
start_time = time.time() |
|
|
|
batch_size = 20 |
|
sentiments = [] |
|
scores = [] |
|
|
|
for i in range(0, len(comments_df), batch_size): |
|
batch = comments_df['content'][i:i+batch_size].tolist() |
|
batch_results = sentiment_task(batch) |
|
|
|
|
|
batch_sentiments = [item['label'] for item in batch_results] |
|
batch_scores = [item['score'] for item in batch_results] |
|
|
|
sentiments.extend(batch_sentiments) |
|
scores.extend(batch_scores) |
|
|
|
comments_df['sentiment'] = sentiments |
|
comments_df['score'] = scores |
|
|
|
end_time = time.time() |
|
print(f"Time taken for batch sentiment analysis: {end_time - start_time} seconds") |
|
|
|
def get_top_comments(comments, sentiment_type, top_n=3): |
|
filtered_comments = comments[comments['sentiment'] == sentiment_type] |
|
top_comments = filtered_comments.nlargest(top_n, 'score') |
|
|
|
if not top_comments.empty: |
|
return '\n\n'.join(f"{row['content']} - {row['author']}" for _, row in top_comments.iterrows()) |
|
else: |
|
return f"No {sentiment_type} comments available." |
|
|
|
start_time = time.time() |
|
|
|
top_positive_comments = get_top_comments(comments_df, 'positive') |
|
|
|
|
|
top_negative_comments = get_top_comments(comments_df, 'negative') |
|
end_time = time.time() |
|
print(f"Time taken for finding top n positive/negative comments: {end_time - start_time} seconds") |
|
|
|
data = {} |
|
|
|
data['total_comments'] = len(comments_df) |
|
data['num_positive'] = comments_df['sentiment'].value_counts().get('positive', 0) |
|
data['num_neutral'] = comments_df['sentiment'].value_counts().get('neutral', 0) |
|
data['num_negative'] = comments_df['sentiment'].value_counts().get('negative', 0) |
|
|
|
|
|
data['blended_comments'] = comments_df['content'].str.cat(sep=' ') |
|
data['pct_positive'] = 100 * round(data['num_positive']/data['total_comments'], 2) |
|
|
|
return data, top_positive_comments, top_negative_comments |
|
|
|
def generate_wordcloud(long_text, additional_stopwords=['Timestamps', 'timestamps']): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stopwords = set(STOPWORDS) |
|
|
|
|
|
all_stopwords = stopwords.union(additional_stopwords) |
|
|
|
|
|
wordcloud = WordCloud(max_font_size=50, max_words=20, background_color="black", stopwords=all_stopwords, colormap='plasma').generate(long_text) |
|
|
|
|
|
plt.figure(figsize=(10,10), facecolor=None) |
|
plt.imshow(wordcloud, interpolation="bilinear") |
|
plt.axis("off") |
|
plt.tight_layout(pad=0) |
|
|
|
|
|
img_buf = io.BytesIO() |
|
plt.savefig(img_buf, format='png', bbox_inches='tight', pad_inches=0) |
|
img_buf.seek(0) |
|
|
|
|
|
plt.close() |
|
|
|
|
|
image = Image.open(img_buf) |
|
|
|
return image |
|
|
|
def create_sentiment_analysis_chart(data): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df = {} |
|
df['num_positive'] = data['num_positive'] |
|
df['num_negative'] = data['num_negative'] |
|
df['num_neutral'] = data['num_neutral'] |
|
df = pd.DataFrame(df, index=[0]) |
|
|
|
|
|
plt.figure(figsize=(8, 6)) |
|
bar_colors = ['green', 'red', 'blue'] |
|
df.plot(kind='bar', color=bar_colors, legend=True) |
|
|
|
|
|
plt.title('Sentiment Analysis Results') |
|
plt.xlabel('Sentiment Types') |
|
plt.ylabel('Number of Comments') |
|
plt.xticks(ticks=[0], labels=['Sentiments'], rotation=0) |
|
plt.legend(['Positive', 'Negative', 'Neutral']) |
|
|
|
|
|
buf = BytesIO() |
|
plt.savefig(buf, format='png') |
|
buf.seek(0) |
|
|
|
|
|
plt.close() |
|
|
|
|
|
image = Image.open(buf) |
|
|
|
return image |
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_youtube_comments(youtube_link, max_comments, stop_words): |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
comments_df = comments_collector(video_link=youtube_link, max_comments=max_comments) |
|
|
|
end_time = time.time() |
|
print(f"Time taken for loading comments: {end_time - start_time} seconds") |
|
|
|
|
|
analysis_dict, top_positive_comments, top_negative_comments = comments_analyzer(comments_df) |
|
|
|
long_text = analysis_dict['blended_comments'] |
|
|
|
start_time = time.time() |
|
|
|
|
|
word_cloud_img = generate_wordcloud(long_text, additional_stopwords=['Timestamps', 'timestamps']) |
|
|
|
end_time = time.time() |
|
print(f"Time taken for generating word clouds: {end_time - start_time} seconds") |
|
|
|
start_time = time.time() |
|
|
|
|
|
sentiment_chart = create_sentiment_analysis_chart(analysis_dict) |
|
|
|
end_time = time.time() |
|
print(f"Time taken for creating sentiment chart: {end_time - start_time} seconds") |
|
|
|
|
|
return word_cloud_img, top_positive_comments, top_negative_comments, sentiment_chart |
|
|
|
|
|
|
|
interface = gr.Interface( |
|
fn=process_youtube_comments, |
|
inputs=[ |
|
gr.Textbox(label="YouTube Video Link"), |
|
gr.Number(label="Maximum Comments", value=100), |
|
gr.Textbox(label="Words to exclude from cloud (comma-separated)") |
|
], |
|
outputs=[ |
|
gr.Image(label="Word Cloud βοΈ"), |
|
gr.Textbox(label="Top 3 Positive Comments ππ»"), |
|
gr.Textbox(label="Top 3 Negative Comments ππ»"), |
|
gr.Image(label="Sentiment Analysis Chart π") |
|
], |
|
title="YouTube Comments Analyzer π", |
|
description="Enter a YouTube link to generate a word cloud, top positive and negative comments, and sentiment analysis of the comments. \n\n Note: The app is both desktop π₯οΈ/mobile π± compatible. Depending on the amount of comments found, it can take up to 1 - 2 mins to process. Have fun π!" |
|
) |
|
|
|
|
|
interface.launch() |
|
|