import gc import os import random import sys import time import gradio as gr import plotly.graph_objects as go import tweepy from detoxify import Detoxify from transformers import pipeline try: from news_classification.news_topic_text_classifier import news_topic_text_classifier except: os.system( "{} -m pip install git+https://github.com/user1342/News-Article-Text-Classification.git".format(sys.executable)) from news_classification.news_topic_text_classifier import news_topic_text_classifier news_model = news_topic_text_classifier() # Twitter API keys consumer_token = os.getenv('consumer_token') consumer_secret = os.getenv('consumer_secret') my_access_token = os.getenv('my_access_token') my_access_secret = os.getenv('my_access_secret') bearer = os.getenv('bearer') html_data = '''

Bubble Check-InπŸ¦πŸ’­

Check-in-on someone's Twitter 'bubble'.

Scroll down to use Bubble Check-In 1.0. ⬇ Bubble Check-In is a tool designed to allow you to check-in-on the type of content someone on Twitter is being exposed to - be that yourself, a friend, loved one, etc. The goal here is to empower users to look out for each-other and identify early if someone is experiencing activity such as hate speech or extremism. We use a queue system, which means you may need to wait your turn to run Bubble Check-In. Bubble Check-In is simple to use simply enter the username of the Twitter account you want to check-in-on and click run!


''' # Setup the gradio block and add some generic CSS block = gr.Blocks( css=".container { max-width: 800px; margin: auto; } h1 { margin: 0px; padding: 5px 0; line-height: 50px; font-size: 60pt; }.close-heading {margin: 0px; padding: 0px;} .close-heading p { margin: 0px; padding: 0px;}", title="Bubble Check-In") def check_connected_users(username): ''' This function retrieves all of the mentions for the given user and all of the tweets from their following. :param username: the target user :return: a dict of user information relating to the following and mentions of the target user. ''' client = tweepy.Client( bearer_token=bearer, consumer_key=consumer_token, consumer_secret=consumer_secret, access_token=my_access_token, access_token_secret=my_access_secret ) user_id = client.get_user(username=username).data.data["id"] tweet_data_dict = {} user_count = 0 # Get users that have mentioned the target user success = False users_mentions = [] while not success: try: users_mentions = client.get_users_mentions(id=user_id, tweet_fields=["author_id"], max_results=10).data if users_mentions == None: users_mentions = [] success = True except tweepy.errors.TooManyRequests as e: print("sleeping") print(e) time.sleep(120) success = False continue mention_count = 0 for tweet in users_mentions: success = False while not success: try: mention_count = mention_count + 1 user = client.get_user(id=tweet.author_id).data print("Processing user {}'s mentions. Mention {} of {}. Mention from user {}".format(username, mention_count, len(users_mentions), user)) # Is this the first time adding a tweet from this user, if so act accordingly if user not in tweet_data_dict: tweet_data_dict[user] = {} tweet_data_dict[user]["tweets"] = [] tweet_data_dict[user]["tweets"].append(tweet.data["text"]) # Adds the mention type to the user data tweet_data_dict[user]["type"] = ["mentioned"] # Used for wrapping error handling success = True except tweepy.errors.TooManyRequests as e: print("sleeping") print(e) time.sleep(120) success = False continue # Loop through all users that the target user is following following = client.get_users_following(id=user_id, max_results=1000).data # Only take at a maximum the last x following if len(following) >= 50: following = following[:50] for user in following: success = False while not success: try: user_count = user_count + 1 # If the user hasn't already been observed in mentions then create a new list for tweets (if not it would have been created previously) if user not in tweet_data_dict: tweet_data_dict[user] = {} tweet_data_dict[user]["tweets"] = [] # Adds the following type to the user data if "type" not in tweet_data_dict[user]: tweet_data_dict[user]["type"] = ["following"] else: tweet_data_dict[user]["type"].append("following") tweets = client.get_users_tweets(id=user.id, max_results=5) tweets = tweets[0] if tweets is not None: print("Processing user {}'s followers. {}, number {} of {}. Total user tweets {}.".format(username, user, user_count, len(following), len(tweets))) for users_tweet in tweets: tweet_data = str(users_tweet.text) tweet_data_dict[user]["tweets"].append(tweet_data) success = True except tweepy.errors.TooManyRequests as e: print("sleeping") time.sleep(120) print(e) success = False continue # toxicity_score = Detoxify('original').predict(tweet_data)["toxicity"] # toxicities.append(toxicity_score) # tweet_data_dict[user]["average_toxicity"] = sum(toxicities) / len(toxicities) # do processing such as sentiment, centrality, hate speech, etc sentiment_pipeline = pipeline("sentiment-analysis") for current_username in tweet_data_dict: current_user_data = tweet_data_dict[current_username] toxicities = {} sentiments = {} types = {} user_tweets = current_user_data["tweets"] # Only consider users with posts for analysis if len(user_tweets) == 0: continue print("Processing metadata for {}'s tweets".format(current_username)) for tweet in user_tweets: # Do hate speech average if 'toxicity' not in toxicities: toxicities['toxicity'] = [] toxicities['severe_toxicity'] = [] toxicities['obscene'] = [] toxicities['identity_attack'] = [] toxicities['insult'] = [] toxicities['threat'] = [] toxicities['sexual_explicit'] = [] scores = Detoxify('unbiased').predict([tweet]) toxicities['toxicity'].append(scores['toxicity'][0]) toxicities['severe_toxicity'].append(scores['severe_toxicity'][0]) toxicities['obscene'].append(scores['obscene'][0]) toxicities['identity_attack'].append(scores['identity_attack'][0]) toxicities['insult'].append(scores['insult'][0]) toxicities['threat'].append(scores['threat'][0]) toxicities['sexual_explicit'].append(scores['sexual_explicit'][0]) # Do sentiment analysis sentiment_score = sentiment_pipeline(tweet) sentiment_score = sentiment_score[0] if "NEGATIVE" == sentiment_score["label"]: if "NEGATIVE" not in sentiments: sentiments["NEGATIVE"] = [] sentiments["NEGATIVE"].append(sentiment_score["score"]) elif "POSITIVE" == sentiment_score["label"]: if "POSITIVE" not in sentiments: sentiments["POSITIVE"] = [] sentiments["POSITIVE"].append(sentiment_score["score"]) # Do type of post (news) type = news_model.get_category(tweet) if type in types: types[type] = types[type] + 1 else: types[type] = 1 tweet_data_dict[current_username]["average_toxicity"] = sum(toxicities['toxicity']) / len( toxicities['toxicity']) tweet_data_dict[current_username]["average_severe_toxicity"] = sum(toxicities['severe_toxicity']) / len( toxicities['severe_toxicity']) tweet_data_dict[current_username]["average_obscene"] = sum(toxicities['obscene']) / len(toxicities['obscene']) tweet_data_dict[current_username]["average_identity_attack"] = sum(toxicities['identity_attack']) / len( toxicities['identity_attack']) tweet_data_dict[current_username]["average_insult"] = sum(toxicities['insult']) / len(toxicities['insult']) tweet_data_dict[current_username]["average_threat"] = sum(toxicities['threat']) / len(toxicities['threat']) tweet_data_dict[current_username]["average_sexual_explicit"] = sum(toxicities['sexual_explicit']) / len( toxicities['sexual_explicit']) tweet_data_dict[current_username]["types"] = types tweet_data_dict[current_username]["sentiments"] = sentiments gc.collect() return tweet_data_dict def button_pressed(text_box): ''' A function that is called when the 'run' button is pressed :param text_box: a string which should relate to a Twitter users username :return: several gradio elements used to populate plots and a summary label field ''' tweet_data = check_connected_users(text_box) total_types_count = {} total_average_toxicity = [] total_average_severe_toxicity = [] total_average_obscene = [] total_average_identity_attack = [] total_identity_attack = [] total_average_insult = [] total_average_threat = [] total_average_sexual_explicit = [] total_average_pos_sentiment = [] total_average_neg_sentiment = [] mentions = 0 following = 0 tweets = 0 user_data = {} for user in tweet_data: data = tweet_data[user] tweets = tweets + len(data["tweets"]) if len(data["tweets"]) < 1: continue if "mentioned" in data["type"]: mentions = mentions + 1 if "following" in data["type"]: following = following + 1 types = data["types"] # Get types for type in types: if type not in total_types_count: total_types_count[type] = 1 else: total_types_count[type] = total_types_count[type] + 1 total_average_toxicity.append(data["average_toxicity"]) user_data[user.name] = data["average_toxicity"] total_average_severe_toxicity.append(data["average_severe_toxicity"]) total_average_obscene.append(data["average_obscene"]) total_average_identity_attack.append(data["average_identity_attack"]) total_average_insult.append(data["average_insult"]) total_average_threat.append(data["average_threat"]) total_average_sexual_explicit.append(data["average_sexual_explicit"]) if 'NEGATIVE' in data["sentiments"]: for sentiment in data["sentiments"]["NEGATIVE"]: total_average_neg_sentiment.append(sentiment) if 'POSITIVE' in data["sentiments"]: for sentiment in data["sentiments"]["POSITIVE"]: total_average_pos_sentiment.append(sentiment) # Comprise elements for hate speech plot total_average_toxicity = sum(total_average_toxicity) / len(total_average_toxicity) total_average_severe_toxicity = sum(total_average_severe_toxicity) / len(total_average_severe_toxicity) total_average_obscene = sum(total_average_obscene) / len(total_average_obscene) total_average_identity_attack = sum(total_average_identity_attack) / len(total_average_identity_attack) total_average_insult = sum(total_average_insult) / len(total_average_insult) total_average_threat = sum(total_average_threat) / len(total_average_threat) total_average_sexual_explicit = sum(total_average_sexual_explicit) / len(total_average_sexual_explicit) total_average_neg_sentiment = sum(total_average_neg_sentiment) / len(total_average_neg_sentiment) total_average_pos_sentiment = sum(total_average_pos_sentiment) / len(total_average_pos_sentiment) toxicity_plot = dict({ "data": [{"type": "bar", "x": ["Average Toxicity", "Average Severe Toxicity", "Average Obscene", "Average Identity Attack", "Average Insult", "Average Threat", "Average Sexual Explicit"], "y": [total_average_toxicity, total_average_severe_toxicity, total_average_obscene, total_average_identity_attack, total_average_insult, total_average_threat, total_average_sexual_explicit]}], "layout": {"title": {"text": "Hate Speech"}} }) toxicity_plot_fig = go.Figure(toxicity_plot) # Comprise elements for sentiment plot sentiment_plot = dict({ "data": [{"type": "bar", "x": ["Positive Sentiment Average", "Negative Sentiment Average"], "y": [total_average_pos_sentiment, total_average_neg_sentiment]}], "layout": {"title": {"text": "Sentiment"}} }) sentiment_plot_fig = go.Figure(sentiment_plot) # User distrabution plot user_plot = dict({ "data": [{"type": "bar", "x": list(user_data.keys()), "y": list(user_data.values())}], "layout": {"title": {"text": "Hate Speech By Observed User"}} }) user_plot_fig = go.Figure(user_plot) # Distrabution Pie labels = ['Timeline', 'Mentions'.format(text_box)] values = [following,mentions] distrabution_fig = go.Figure(data=[go.Pie(labels=labels, values=values, title="Distribution Of Observed Users")]) # Comprise elements for 'type' plot colours = [] keys = list(total_types_count.keys()) x_list = [] for key in keys: x_list.append(key.replace("_", " ").title()) for iterator in range(0, len(keys)): colours.append('rgb({}, {}, {})'.format(random.randint(1, 255), random.randint(1, 255), random.randint(1, 255))) sizes = [] for value in total_types_count.values(): sizes.append(value * 20) fig = go.Figure(data=[go.Scatter( x=x_list, y=list(total_types_count.values()), mode='markers', marker=dict( color=colours, size=sizes ) )]) # Comprise text for summary label original_text = "A total number of {} recent tweets in @{}'s mentions and timeline were reviewed, of which @{} was exposed to {} users via mentions and " \ "{} directly via following them.".format(tweets, text_box, text_box, mentions, following) text = original_text high_identifiers = [] extreme_identifiers = [] if total_average_toxicity > 75: extreme_identifiers.append("toxic") elif total_average_toxicity > 50: high_identifiers.append("toxic") if total_average_severe_toxicity > 75: extreme_identifiers.append("severe toxic") elif total_average_severe_toxicity > 50: high_identifiers.append("severe toxic") if total_average_obscene > 75: extreme_identifiers.append("obscene") elif total_average_obscene > 50: high_identifiers.append("obscene") if total_average_identity_attack > 75: extreme_identifiers.append("identity based hate") elif total_average_identity_attack > 50: high_identifiers.append("identity based hate") if total_average_insult > 75: extreme_identifiers.append("insulting") elif total_average_insult > 50: high_identifiers.append("insulting") if total_average_threat > 75: extreme_identifiers.append("threatening") elif total_average_threat > 50: high_identifiers.append("threatening") if total_average_sexual_explicit > 75: extreme_identifiers.append("sexually explicit") elif total_average_sexual_explicit > 50: high_identifiers.append("sexually explicit") if len(high_identifiers) > 0: text = text + " @{} is observing a high amount of " for identifier in high_identifiers: text = text + " {},".format(identifier) text = text[:len(text - 1)] + " language." if len(extreme_identifiers) > 0: text = text + " @{} is observing an extremely high amount of".format(text_box) for identifier in extreme_identifiers: text = text + " {},".format(identifier) text = text[:len(text - 1)] + " language." if total_average_neg_sentiment > 0.7 and total_average_neg_sentiment > total_average_pos_sentiment: text = text + " @{} is experiencing a high amount of negative sentiment content.".format(text_box) elif total_average_neg_sentiment > 0.9 and total_average_neg_sentiment > total_average_pos_sentiment: text = text + " '{} is experiencing a significantly high amount of negative sentiment content.".format(text_box) if len(text) == len(original_text): text = text + " No excessive hate speech or low sentiment was observed in @{}'s mentions or timeline.".format( text_box) return [toxicity_plot_fig, sentiment_plot_fig, fig, text,user_plot_fig,distrabution_fig] # The main chunk of code that uses Gradio blocks to create the UI html_button = None with block: gr.HTML(''' ''') # todo check if user signed in gr.HTML(value=html_data) with gr.Group(): with gr.Row().style(equal_height=True): with gr.Box(): with gr.Row().style(equal_height=True): text_input = gr.Text(label="Username", visible=True, max_lines=1) btn = gr.Button("Run Bubble Check-In").style(full_width=True).style() gr.HTML(value="
") output_label = gr.Label(label="Summary") gr.HTML(value="
") with gr.Row().style(equal_height=True): toxicity_plot = gr.Plot(label="Hate Speech Graph") sentiment_plot = gr.Plot(label="Sentiment Graph") gr.HTML(value="
") type_plot = gr.Plot(label="Content Type Graph") gr.HTML(value="
") with gr.Row().style(equal_height=True): user_plot = gr.Plot(label="Observed Users") format_type_plot = gr.Plot(label="Distribution") btn.click(fn=button_pressed, inputs=[text_input], outputs=[toxicity_plot, sentiment_plot, type_plot, output_label,user_plot,format_type_plot]) gr.Markdown( """___

Created by James Stevenson

""" ) # block.attach_load_events() # Launcg the page block.launch(enable_queue=True,show_api=False)