Spaces:
Runtime error
Runtime error
#!/usr/bin/env python | |
# coding: utf-8 | |
import json | |
import os | |
import re | |
import time | |
from random import random | |
import socket | |
from threading import Thread | |
from time import sleep | |
# Twitter keys | |
consumer_token = os.getenv('CONSUMER_TOKEN') | |
consumer_secret = os.getenv('CONSUMER_SECRET') | |
my_access_token = os.getenv('ACCESS_TOKEN') | |
my_access_secret = os.getenv('ACCESS_SECRET') | |
bearer = os.getenv('BEARER') | |
html_data = ''' | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1"> | |
<link rel="stylesheet" href="https://www.w3schools.com/w3css/4/w3.css"> | |
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Poppins"> | |
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css"> | |
<style> | |
body,h1,h2,h3,h4,h5 {font-family: "Poppins", sans-serif} | |
body {font-size: 16px;} | |
img {margin-bottom: -8px;} | |
.mySlides {display: none;} | |
</style> | |
</head> | |
<body class="w3-content w3-black" style="max-width:1500px;"> | |
<!-- The App Section --> | |
<div class="w3-padding-large w3-white"> | |
<div class="w3-row-padding-large"> | |
<div class="w3-col"> | |
<h1 class="w3-jumbo"><b>WatchTower 🐦🚧</b></h1> | |
<h1 class="w3-xxxlarge w3-text-blue"><b>Remove Unfavorable Tweets From Your Feed </b></h1> | |
<p><span class="w3-xlarge">Scroll down to use WatchTower 1.0. ⬇ </span> WatchTower is a tool that identifies hate speech, misinformation, and extremist content and blocks/ mutes it from your Twitter feed. WatchTower blocks content based on it's current database, so make sure to come back regularly to ensure you're up to date! We use a queue system, which means <b> you may need to wait your turn to run WatchTower</b> - however, once you've clicked run, you can close the tab as WatchTower will continue in the background. WatchTower is simple to use: first scroll down the page and click the 'sign in with Twitter' button, then you'll be taken to the Twitter website and asked to verify yourself, after this you'll be taken back here, then simply scroll down to the bottom of the page and click run!</p> | |
<a href="https://www.watchtower.cartographer.one/"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'"> | |
<i class=""></i> Find Out More! 💬 | |
</button></a> | |
<a href="https://ko-fi.com/jamesstevenson"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'"> | |
<i class=""></i> Support The Creator! ❤ | |
</button></a> | |
<a href="https://twitter.com/WATCHTOWER_WEB"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'"> | |
<i class=""></i> Follow Us! 🐦 | |
</button></a> | |
</div> | |
</div> | |
</div> | |
<!-- Modal --> | |
<script> | |
// Slideshow | |
var slideIndex = 1; | |
showDivs(slideIndex); | |
function plusDivs(n) { | |
showDivs(slideIndex += n); | |
} | |
function showDivs(n) { | |
var i; | |
var x = document.getElementsByClassName("mySlides"); | |
if (n > x.length) {slideIndex = 1} | |
if (n < 1) {slideIndex = x.length} | |
for (i = 0; i < x.length; i++) { | |
x[i].style.display = "none"; | |
} | |
x[slideIndex-1].style.display = "block"; | |
} | |
</script> | |
<br> | |
</body> | |
</html> | |
''' | |
# Imports | |
import json | |
import os | |
import time | |
import gradio as gr | |
import tweepy | |
# Setup the gradio block and add some generic CSS | |
block = gr.Blocks(css=".container { max-width: 800px; margin: auto; } h1 { margin: 0px; padding: 5px 0; line-height: 50px; font-size: 60pt; }.close-heading {margin: 0px; padding: 0px;} .close-heading p { margin: 0px; padding: 0px;}", title="WatchTower") | |
# Chat history variable used for the chatbot prompt on the 'getting started' page. | |
chat_history = [] | |
def get_client_from_tokens(oauth_verifier, oauth_token): | |
''' | |
This function is used for generating a Tweepy client object based on Oauth verifier and token paramiters | |
:param oauth_verifier: | |
:param oauth_token: | |
:return: A Tweepy client object | |
''' | |
new_oauth1_user_handler = tweepy.OAuth1UserHandler( | |
consumer_token, consumer_secret, | |
callback="https://hf.space/embed/User1342/WatchTower/" | |
) | |
new_oauth1_user_handler.request_token = { | |
"oauth_token": oauth_token, | |
"oauth_token_secret": consumer_secret | |
} | |
access_token, access_token_secret = new_oauth1_user_handler.get_access_token( | |
oauth_verifier | |
) | |
their_client = tweepy.Client( | |
bearer_token=bearer, | |
consumer_key=consumer_token, | |
consumer_secret=consumer_secret, | |
access_token=access_token, | |
access_token_secret=access_token_secret | |
) | |
# TODO: The below is not necessary and can be removed. | |
global client | |
client = their_client | |
return their_client | |
def block_user(user_id, user, reason): | |
finished = False | |
blocked = True | |
attempts = 0 | |
while not finished: | |
try: | |
print("preparing to block {}".format(user_id)) | |
client.block(target_user_id=user_id) | |
print("User blocked") | |
except tweepy.errors.TooManyRequests as e: | |
try: | |
client.mute(target_user_id=user_id) | |
print("Could not block, so muted") | |
except tweepy.errors.TooManyRequests as e: | |
if attempts == 0: | |
print("waiting 15 minutes for rate limit to finish") | |
time.sleep(900) | |
attempts = attempts + 1 | |
continue | |
else: | |
finished = True | |
blocked = False | |
continue | |
except tweepy.errors.BadRequest as e: | |
print("bad request error") | |
print(e) | |
finished = True | |
blocked = False | |
continue | |
except tweepy.errors.BadRequest as e: | |
print("bad request error") | |
print(e) | |
finished = True | |
blocked = False | |
continue | |
except: | |
time.sleep(240) | |
continue | |
#time.sleep(1) | |
finished = True | |
try: | |
me = client.get_me() | |
print("{} blocked {}, for {}".format(me.data["username"], user, reason)) | |
except tweepy.errors.TooManyRequests as e: | |
print("Blocked {}, for {}".format(user, reason)) | |
return blocked | |
def block_users(client, threshold, dataset): | |
''' | |
Used for blocking a series of users based on the threshold and datasets provided. Here the users folder is used. | |
:param client: | |
:param threshold: | |
:param dataset: | |
:return: The number of blocked users. | |
''' | |
num_users_blocked = 0 | |
for filename in os.listdir("users"): | |
filename = os.path.join("users", filename) | |
print("File {} open".format(filename)) | |
user_file = open(filename, "r") | |
users = json.load(user_file) | |
for user in users: | |
print("Reviewing user {}".format(user)) | |
if "threshold" in user: | |
# old type of dataset being used, only 'violent' data available | |
if "Violent" in dataset: | |
if user["threshold"]/2 >= threshold: | |
user_id = str(user["username"]) | |
if block_user(user_id, user, "Violent - old dataset"): | |
num_users_blocked = num_users_blocked + 1 | |
else: | |
# modern dataset being used | |
if "Violent" in dataset: | |
if user["violence-threshold"]/2 >= threshold: | |
user_id = str(user["username"]) | |
if block_user(user_id, user, "Violent"): | |
num_users_blocked = num_users_blocked + 1 | |
continue | |
if "Hate Speech" in dataset: | |
if user["toxicity-threshold"] >= threshold: | |
user_id = str(user["username"]) | |
if block_user(user_id, user, "Hate Speech"): | |
num_users_blocked = num_users_blocked + 1 | |
continue | |
return num_users_blocked | |
def chat(selected_option=None, radio_score=None, url_params=None): | |
''' | |
This function is used to initialise blocking users once the user has authenticated with Twitter. | |
:param selected_option: | |
:param radio_score: | |
:param url_params: | |
:return: the chatbot history is returned (including information on blocked accounts). | |
''' | |
global client | |
global chat_history | |
history = [] | |
# app id | |
if "oauth_verifier" in url_params and "oauth_token" in url_params and client is None: | |
client = get_client_from_tokens(url_params["oauth_verifier"], url_params["oauth_token"]) | |
if radio_score != None and selected_option != None: | |
if client != None: | |
# Extract the list to a string representation | |
if type(selected_option) is list: | |
block_type = "" | |
for b_type in selected_option: | |
block_type = block_type + " + " + b_type.capitalize() | |
block_type = "'" + block_type[3:] + "'" | |
else: | |
block_type = selected_option | |
# Display to user, set options | |
history.append( | |
["Model tuned to a '{}%' threshold and is using the {} dataset.".format(radio_score, block_type.capitalize()), | |
"{} Account blocking initialised".format(block_type.capitalize())]) | |
num_users_blocked = block_users(client, radio_score, selected_option) | |
history.append( | |
["Blocked {} user account(s).".format(num_users_blocked), "Thank you for using Watchtower."]) | |
elif radio_score != None or selected_option != None: | |
chat_history.append(["Initialisation error!", "Please tune the model by using the above options"]) | |
history = chat_history + history | |
chatbot.value = history | |
chatbot.update(value=history) | |
client = None | |
return history | |
def infer(prompt): | |
pass | |
have_initialised = False | |
client = None | |
name = None | |
def button_pressed(slider_value, url_params): | |
# print(url_params) | |
return [None, chat(radio.value, slider_value, url_params)] | |
# The website that the user will visit to authenticate WatchTower. | |
target_website = None | |
def update_target_website(): | |
''' | |
Updates the URL used to authenticate WatchTower with Twitter. | |
#TODO this function is full of old code and can be optimised. | |
:return: | |
''' | |
global have_initialised | |
global chatbot | |
global chat_history | |
global client | |
global name | |
client = None | |
name = "no username" | |
chat_history = [ | |
["Welcome to Watchtower.".format(name), "Log in via Twitter and configure your blocking options above."]] | |
chatbot.value = chat_history | |
chatbot.update(value=chat_history) | |
twitter_auth_button.value = '<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format( | |
get_target_website()) | |
twitter_auth_button.update( | |
value='<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format( | |
get_target_website())) | |
return '<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format( | |
get_target_website()) | |
# The below is a JS blob used to retrieve the URL params. | |
# Thanks to here: https://discuss.huggingface.co/t/hugging-face-and-gradio-url-paramiters/21110/2 | |
get_window_url_params = """ | |
function(text_input, url_params) { | |
console.log(text_input, url_params); | |
const params = new URLSearchParams(window.location.search); | |
url_params = Object.fromEntries(params); | |
return [text_input, url_params]; | |
} | |
""" | |
def get_chatbot_text(): | |
return [('Welcome to Watchtower.', 'Log in via Twitter and configure your blocking options above.')] | |
def get_target_website(): | |
''' | |
A wrapper function used for retrieving the URL a user will use to authenticate WatchTower with Twitter. | |
:return: | |
''' | |
oauth1_user_handler = tweepy.OAuth1UserHandler( | |
consumer_token, consumer_secret, | |
callback="https://hf.space/embed/User1342/WatchTower/" | |
) | |
target_website = oauth1_user_handler.get_authorization_url(signin_with_twitter=True) | |
return target_website | |
# The Gradio HTML component used for the 'sign in with Twitter' button | |
# The main chunk of code that uses Gradio blocks to create the UI | |
html_button = None | |
with block: | |
gr.HTML(''' | |
<meta name="viewport" content="width=device-width, initial-scale=1"> | |
<link rel="stylesheet" href="https://www.w3schools.com/w3css/4/w3.css"> | |
''') | |
# todo check if user signed in | |
user_message = "Log in via Twitter and configure your blocking options above." | |
chat_history.append(["Welcome to Watchtower.", user_message]) | |
gr.HTML(value=html_data) | |
with gr.Group(): | |
with gr.Row().style(equal_height=True): | |
with gr.Box(): | |
#gr.Label(value="WatchTower", visible=True, interactive=False) | |
url_params = gr.JSON({}, visible=False, label="URL Params").style( | |
) | |
text_input = gr.Text(label="Input", visible=False).style() | |
text_output = gr.Text(label="Output", visible=False).style() | |
html_button = twitter_auth_button = gr.HTML( | |
value='<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format( | |
get_target_website())).style( | |
) | |
with gr.Row().style(equal_height=True): | |
radio = gr.CheckboxGroup(value=["Violent", "Hate Speech"], choices=["Violent", "Hate Speech", "Misinformation"], | |
interactive=False, label="Behaviour To Block").style() | |
slider = gr.Slider(value=30, interactive=True, label="Threshold Confidence Tolerance") | |
chatbot = gr.Chatbot(label="Watchtower Output", value=get_chatbot_text()).style(color_map=["blue","grey"]) | |
btn = gr.Button("Run WatchTower").style(full_width=True).style() | |
btn.click(fn=button_pressed, inputs=[slider, url_params], | |
outputs=[text_output, chatbot], _js=get_window_url_params) | |
gr.Markdown( | |
"""___ | |
<p style='text-align: center'> | |
Created by <a href="https://twitter.com/_JamesStevenson" target="_blank"</a> James Stevenson | |
<br/> | |
</p>""" | |
) | |
# Setup callback for when page loads (used to set a new Twitter auth target webspage) | |
block.__enter__() | |
block.set_event_trigger( | |
event_name="load", fn=update_target_website, inputs=None, outputs=[html_button], no_target=True | |
) | |
block.set_event_trigger( | |
event_name="load", fn=get_chatbot_text, inputs=None, outputs=[chatbot], no_target=True | |
) | |
#block.attach_load_events() | |
# Launcg the page | |
block.launch(enable_queue = True) |