import pandas as pd import numpy as np import snscrape.modules.twitter as sntwitter import streamlit as st from transformers import AutoTokenizer, AutoModelForSequenceClassification from transformers import pipeline import plotly.express as px import plotly.io as pio import plotly.graph_objects as go import matplotlib as mpl import matplotlib.pyplot as plt from wordcloud import WordCloud from PIL import Image import requests from itertools import islice from youtube_comment_downloader import * @st.cache(allow_output_mutation=True) def get_nltk(): import nltk nltk.download( ["punkt", "wordnet", "omw-1.4", "averaged_perceptron_tagger", "universal_tagset"] ) return get_nltk() from nltk.stem import WordNetLemmatizer from nltk.tag import pos_tag from nltk.tokenize import word_tokenize import re from sklearn.feature_extraction.text import CountVectorizer # Create a custom plotly theme and set it as default pio.templates["custom"] = pio.templates["plotly_white"] pio.templates["custom"].layout.margin = {"b": 25, "l": 25, "r": 25, "t": 50} pio.templates["custom"].layout.width = 600 pio.templates["custom"].layout.height = 450 pio.templates["custom"].layout.autosize = False pio.templates["custom"].layout.font.update( {"family": "Arial", "size": 12, "color": "#707070"} ) pio.templates["custom"].layout.title.update( { "xref": "container", "yref": "container", "x": 0.5, "yanchor": "top", "font_size": 16, "y": 0.95, "font_color": "#353535", } ) pio.templates["custom"].layout.xaxis.update( {"showline": True, "linecolor": "lightgray", "title_font_size": 14} ) pio.templates["custom"].layout.yaxis.update( {"showline": True, "linecolor": "lightgray", "title_font_size": 14} ) pio.templates["custom"].layout.colorway = [ "#1F77B4", "#FF7F0E", "#54A24B", "#D62728", "#C355FA", "#8C564B", "#E377C2", "#7F7F7F", "#FFE323", "#17BECF", ] pio.templates.default = "custom" @st.cache(allow_output_mutation=True) def get_sentiment_model(): tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert") model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert") return tokenizer,model tokenizer_sentiment,model_sentiment = get_sentiment_model() def get_tweets(query, max_tweets): if query[0] == '@': query = query[1:] tweets_list = [] # Using TwitterSearchScraper to scrape data for i,tweet in enumerate(sntwitter.TwitterSearchScraper('from:'+query).get_items()): if i>max_tweets: break tweets_list.append([tweet.date, tweet.user.username, tweet.content]) # Creating a dataframe from the tweets list above tweets_df = pd.DataFrame(tweets_list, columns=['Datetime', 'Username', 'Tweet']) else: # Creating list to append tweet data to tweets_list = [] # Using TwitterSearchScraper to scrape data and append tweets to list for i,tweet in enumerate(sntwitter.TwitterSearchScraper(query+' until:').get_items()): if i>max_tweets: break tweets_list.append([tweet.date, tweet.user.username, tweet.content]) # Creating a dataframe from the tweets list above tweets_df = pd.DataFrame(tweets_list, columns=['Datetime', 'Username', 'Tweet']) tweets_df['Datetime'] = pd.to_datetime(tweets_df['Datetime']) tweets_df['Date'] = tweets_df['Datetime'].dt.date tweets_df['Time'] = tweets_df['Datetime'].dt.strftime('%H:%M') #tweets_df['Datetime'].dt.time tweets_df.drop('Datetime', axis=1, inplace=True) return tweets_df def get_youtube_comments(url, num_comments): pattern = '"playabilityStatus":{"status":"ERROR","reason":"Video unavailable"' def try_site(url): request = requests.get(url) return False if pattern in request.text else True video_exists = try_site(url) if video_exists: comment_list = [] downloader = YoutubeCommentDownloader() comments = downloader.get_comments_from_url(url, sort_by=SORT_BY_POPULAR) for comment in islice(comments, num_comments): comment_list.append(comment['text']) return comment_list else: raise Exception('Video does not exist') def get_sentiment_youtube(useful_sentence): tokenizer = tokenizer_sentiment model = model_sentiment pipe = pipeline(model="ProsusAI/finbert") classifier = pipeline(model="ProsusAI/finbert") output=[] i=0 useful_sentence_len = len(useful_sentence) for temp in useful_sentence: output.extend(classifier(temp)) i=i+1 df = pd.DataFrame.from_dict(useful_sentence) df_temp = pd.DataFrame.from_dict(output) df = pd.concat([df, df_temp], axis=1) df = df.rename(columns={'label': 'Sentiment'}) df = df.rename(columns={0: 'Comment'}) df['Sentiment'] = df['Sentiment'].replace('positive', 'Positive') df['Sentiment'] = df['Sentiment'].replace('negative', 'Negative') df['Sentiment'] = df['Sentiment'].replace('neutral', 'Neutral') return df def text_preprocessing(text): stopwords = set() with open("static/en_stopwords.txt", "r") as file: for word in file: stopwords.add(word.rstrip("\n")) lemmatizer = WordNetLemmatizer() try: url_pattern = r"((http://)[^ ]*|(https://)[^ ]*|(www\.)[^ ]*)" user_pattern = r"@[^\s]+" entity_pattern = r"&.*;" neg_contraction = r"n't\W" non_alpha = "[^a-z]" cleaned_text = text.lower() cleaned_text = re.sub(neg_contraction, " not ", cleaned_text) cleaned_text = re.sub(url_pattern, " ", cleaned_text) cleaned_text = re.sub(user_pattern, " ", cleaned_text) cleaned_text = re.sub(entity_pattern, " ", cleaned_text) cleaned_text = re.sub(non_alpha, " ", cleaned_text) tokens = word_tokenize(cleaned_text) # provide POS tag for lemmatization to yield better result word_tag_tuples = pos_tag(tokens, tagset="universal") tag_dict = {"NOUN": "n", "VERB": "v", "ADJ": "a", "ADV": "r"} final_tokens = [] for word, tag in word_tag_tuples: if len(word) > 1 and word not in stopwords: if tag in tag_dict: final_tokens.append(lemmatizer.lemmatize(word, tag_dict[tag])) else: final_tokens.append(lemmatizer.lemmatize(word)) return " ".join(final_tokens) except: return np.nan def get_sentiment(df): useful_sentence = df['Tweet'].tolist() tokenizer = tokenizer_sentiment model = model_sentiment pipe = pipeline(model="ProsusAI/finbert") classifier = pipeline(model="ProsusAI/finbert") output=[] i=0 useful_sentence_len = len(useful_sentence) for temp in useful_sentence: output.extend(classifier(temp)) i=i+1 df_temp = pd.DataFrame.from_dict(output) df = pd.concat([df, df_temp], axis=1) df = df.rename(columns={'label': 'Sentiment'}) df['Sentiment'] = df['Sentiment'].replace('positive', 'Positive') df['Sentiment'] = df['Sentiment'].replace('negative', 'Negative') df['Sentiment'] = df['Sentiment'].replace('neutral', 'Neutral') return df def plot_sentiment(tweet_df): sentiment_count = tweet_df["Sentiment"].value_counts() fig = px.pie( values=sentiment_count.values, names=sentiment_count.index, hole=0.3, title="Sentiment Distribution", color=sentiment_count.index, color_discrete_map={"Positive": "#54A24B", "Negative": "#FF7F0E", "Neutral": "#1F77B4"}, ) fig.update_traces( textposition="inside", texttemplate="%{label}
%{value} (%{percent})", hovertemplate="%{label}
Percentage=%{percent}
Count=%{value}", ) fig.update_layout(showlegend=False) return fig def get_top_n_gram(tweet_df, ngram_range, n=10): try: stopwords = set() with open("static/en_stopwords_ngram.txt", "r") as file: for word in file: stopwords.add(word.rstrip("\n")) stopwords = list(stopwords) corpus = tweet_df["Tweet"] vectorizer = CountVectorizer( analyzer="word", ngram_range=ngram_range, stop_words=stopwords ) X = vectorizer.fit_transform(corpus.astype(str).values) words = vectorizer.get_feature_names_out() words_count = np.ravel(X.sum(axis=0)) df = pd.DataFrame(zip(words, words_count)) df.columns = ["words", "counts"] df = df.sort_values(by="counts", ascending=False).head(n) df["words"] = df["words"].str.title() return df except: pass def plot_n_gram(n_gram_df, title, color="#54A24B"): try: fig = px.bar( # n_gram_df, # x="counts", # y="words", x=n_gram_df.counts, y=n_gram_df.words, title="{}".format(title), text_auto=True, ) fig.update_layout(plot_bgcolor="white") fig.update_xaxes(title=None) fig.update_yaxes(autorange="reversed", title=None) fig.update_traces(hovertemplate="%{y}
Count=%{x}", marker_color=color) return fig except: fig = go.Figure() return fig def plot_wordcloud(tweet_df, colormap="Greens", mask_url="static/twitter_mask.png"): try: stopwords = set() with open("static/en_stopwords_ngram.txt", "r") as file: for word in file: stopwords.add(word.rstrip("\n")) cmap = mpl.cm.get_cmap(colormap)(np.linspace(0, 1, 20)) cmap = mpl.colors.ListedColormap(cmap[10:15]) mask = np.array(Image.open(mask_url)) font = "static/quartzo.ttf" tweet_df["Cleaned_Tweet"] = tweet_df["Tweet"].apply(text_preprocessing) text = " ".join(tweet_df["Cleaned_Tweet"]) wc = WordCloud( background_color="white", font_path=font, stopwords=stopwords, max_words=90, colormap=cmap, mask=mask, random_state=42, collocations=False, min_word_length=2, max_font_size=200, ) wc.generate(text) fig = plt.figure(figsize=(8, 8)) ax = fig.add_subplot(1, 1, 1) plt.imshow(wc, interpolation="bilinear") plt.axis("off") plt.title("Wordcloud", fontdict={"fontsize": 16}, fontweight="heavy", pad=20, y=1.0) return fig except: fig = go.Figure() return fig