WatchTower / app.py
User1342's picture
Update app.py
f26110a
#!/usr/bin/env python
# coding: utf-8
import json
import os
import re
import time
from random import random
import socket
from threading import Thread
from time import sleep
# Twitter keys
consumer_token = os.getenv('CONSUMER_TOKEN')
consumer_secret = os.getenv('CONSUMER_SECRET')
my_access_token = os.getenv('ACCESS_TOKEN')
my_access_secret = os.getenv('ACCESS_SECRET')
bearer = os.getenv('BEARER')
html_data = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://www.w3schools.com/w3css/4/w3.css">
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Poppins">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<style>
body,h1,h2,h3,h4,h5 {font-family: "Poppins", sans-serif}
body {font-size: 16px;}
img {margin-bottom: -8px;}
.mySlides {display: none;}
</style>
</head>
<body class="w3-content w3-black" style="max-width:1500px;">
<!-- The App Section -->
<div class="w3-padding-large w3-white">
<div class="w3-row-padding-large">
<div class="w3-col">
<h1 class="w3-jumbo"><b>WatchTower 🐦🚧</b></h1>
<h1 class="w3-xxxlarge w3-text-blue"><b>Remove Unfavorable Tweets From Your Feed </b></h1>
<p><span class="w3-xlarge">Scroll down to use WatchTower 1.0. ⬇ </span> WatchTower is a tool that identifies hate speech, misinformation, and extremist content and blocks/ mutes it from your Twitter feed. WatchTower blocks content based on it's current database, so make sure to come back regularly to ensure you're up to date! We use a queue system, which means <b> you may need to wait your turn to run WatchTower</b> - however, once you've clicked run, you can close the tab as WatchTower will continue in the background. WatchTower is simple to use: first scroll down the page and click the 'sign in with Twitter' button, then you'll be taken to the Twitter website and asked to verify yourself, after this you'll be taken back here, then simply scroll down to the bottom of the page and click run!</p>
<a href="https://www.watchtower.cartographer.one/"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'">
<i class=""></i> Find Out More! 💬
</button></a>
<a href="https://ko-fi.com/jamesstevenson"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'">
<i class=""></i> Support The Creator! ❤
</button></a>
<a href="https://twitter.com/WATCHTOWER_WEB"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'">
<i class=""></i> Follow Us! 🐦
</button></a>
</div>
</div>
</div>
<!-- Modal -->
<script>
// Slideshow
var slideIndex = 1;
showDivs(slideIndex);
function plusDivs(n) {
showDivs(slideIndex += n);
}
function showDivs(n) {
var i;
var x = document.getElementsByClassName("mySlides");
if (n > x.length) {slideIndex = 1}
if (n < 1) {slideIndex = x.length}
for (i = 0; i < x.length; i++) {
x[i].style.display = "none";
}
x[slideIndex-1].style.display = "block";
}
</script>
<br>
</body>
</html>
'''
# Imports
import json
import os
import time
import gradio as gr
import tweepy
# Setup the gradio block and add some generic CSS
block = gr.Blocks(css=".container { max-width: 800px; margin: auto; } h1 { margin: 0px; padding: 5px 0; line-height: 50px; font-size: 60pt; }.close-heading {margin: 0px; padding: 0px;} .close-heading p { margin: 0px; padding: 0px;}", title="WatchTower")
# Chat history variable used for the chatbot prompt on the 'getting started' page.
chat_history = []
def get_client_from_tokens(oauth_verifier, oauth_token):
'''
This function is used for generating a Tweepy client object based on Oauth verifier and token paramiters
:param oauth_verifier:
:param oauth_token:
:return: A Tweepy client object
'''
new_oauth1_user_handler = tweepy.OAuth1UserHandler(
consumer_token, consumer_secret,
callback="https://hf.space/embed/User1342/WatchTower/"
)
new_oauth1_user_handler.request_token = {
"oauth_token": oauth_token,
"oauth_token_secret": consumer_secret
}
access_token, access_token_secret = new_oauth1_user_handler.get_access_token(
oauth_verifier
)
their_client = tweepy.Client(
bearer_token=bearer,
consumer_key=consumer_token,
consumer_secret=consumer_secret,
access_token=access_token,
access_token_secret=access_token_secret
)
# TODO: The below is not necessary and can be removed.
global client
client = their_client
return their_client
def block_user(user_id, user, reason):
finished = False
blocked = True
attempts = 0
while not finished:
try:
print("preparing to block {}".format(user_id))
client.block(target_user_id=user_id)
print("User blocked")
except tweepy.errors.TooManyRequests as e:
try:
client.mute(target_user_id=user_id)
print("Could not block, so muted")
except tweepy.errors.TooManyRequests as e:
if attempts == 0:
print("waiting 15 minutes for rate limit to finish")
time.sleep(900)
attempts = attempts + 1
continue
else:
finished = True
blocked = False
continue
except tweepy.errors.BadRequest as e:
print("bad request error")
print(e)
finished = True
blocked = False
continue
except tweepy.errors.BadRequest as e:
print("bad request error")
print(e)
finished = True
blocked = False
continue
except:
time.sleep(240)
continue
#time.sleep(1)
finished = True
try:
me = client.get_me()
print("{} blocked {}, for {}".format(me.data["username"], user, reason))
except tweepy.errors.TooManyRequests as e:
print("Blocked {}, for {}".format(user, reason))
return blocked
def block_users(client, threshold, dataset):
'''
Used for blocking a series of users based on the threshold and datasets provided. Here the users folder is used.
:param client:
:param threshold:
:param dataset:
:return: The number of blocked users.
'''
num_users_blocked = 0
for filename in os.listdir("users"):
filename = os.path.join("users", filename)
print("File {} open".format(filename))
user_file = open(filename, "r")
users = json.load(user_file)
for user in users:
print("Reviewing user {}".format(user))
if "threshold" in user:
# old type of dataset being used, only 'violent' data available
if "Violent" in dataset:
if user["threshold"]/2 >= threshold:
user_id = str(user["username"])
if block_user(user_id, user, "Violent - old dataset"):
num_users_blocked = num_users_blocked + 1
else:
# modern dataset being used
if "Violent" in dataset:
if user["violence-threshold"]/2 >= threshold:
user_id = str(user["username"])
if block_user(user_id, user, "Violent"):
num_users_blocked = num_users_blocked + 1
continue
if "Hate Speech" in dataset:
if user["toxicity-threshold"] >= threshold:
user_id = str(user["username"])
if block_user(user_id, user, "Hate Speech"):
num_users_blocked = num_users_blocked + 1
continue
return num_users_blocked
def chat(selected_option=None, radio_score=None, url_params=None):
'''
This function is used to initialise blocking users once the user has authenticated with Twitter.
:param selected_option:
:param radio_score:
:param url_params:
:return: the chatbot history is returned (including information on blocked accounts).
'''
global client
global chat_history
history = []
# app id
if "oauth_verifier" in url_params and "oauth_token" in url_params and client is None:
client = get_client_from_tokens(url_params["oauth_verifier"], url_params["oauth_token"])
if radio_score != None and selected_option != None:
if client != None:
# Extract the list to a string representation
if type(selected_option) is list:
block_type = ""
for b_type in selected_option:
block_type = block_type + " + " + b_type.capitalize()
block_type = "'" + block_type[3:] + "'"
else:
block_type = selected_option
# Display to user, set options
history.append(
["Model tuned to a '{}%' threshold and is using the {} dataset.".format(radio_score, block_type.capitalize()),
"{} Account blocking initialised".format(block_type.capitalize())])
num_users_blocked = block_users(client, radio_score, selected_option)
history.append(
["Blocked {} user account(s).".format(num_users_blocked), "Thank you for using Watchtower."])
elif radio_score != None or selected_option != None:
chat_history.append(["Initialisation error!", "Please tune the model by using the above options"])
history = chat_history + history
chatbot.value = history
chatbot.update(value=history)
client = None
return history
def infer(prompt):
pass
have_initialised = False
client = None
name = None
def button_pressed(slider_value, url_params):
# print(url_params)
return [None, chat(radio.value, slider_value, url_params)]
# The website that the user will visit to authenticate WatchTower.
target_website = None
def update_target_website():
'''
Updates the URL used to authenticate WatchTower with Twitter.
#TODO this function is full of old code and can be optimised.
:return:
'''
global have_initialised
global chatbot
global chat_history
global client
global name
client = None
name = "no username"
chat_history = [
["Welcome to Watchtower.".format(name), "Log in via Twitter and configure your blocking options above."]]
chatbot.value = chat_history
chatbot.update(value=chat_history)
twitter_auth_button.value = '<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format(
get_target_website())
twitter_auth_button.update(
value='<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format(
get_target_website()))
return '<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format(
get_target_website())
# The below is a JS blob used to retrieve the URL params.
# Thanks to here: https://discuss.huggingface.co/t/hugging-face-and-gradio-url-paramiters/21110/2
get_window_url_params = """
function(text_input, url_params) {
console.log(text_input, url_params);
const params = new URLSearchParams(window.location.search);
url_params = Object.fromEntries(params);
return [text_input, url_params];
}
"""
def get_chatbot_text():
return [('Welcome to Watchtower.', 'Log in via Twitter and configure your blocking options above.')]
def get_target_website():
'''
A wrapper function used for retrieving the URL a user will use to authenticate WatchTower with Twitter.
:return:
'''
oauth1_user_handler = tweepy.OAuth1UserHandler(
consumer_token, consumer_secret,
callback="https://hf.space/embed/User1342/WatchTower/"
)
target_website = oauth1_user_handler.get_authorization_url(signin_with_twitter=True)
return target_website
# The Gradio HTML component used for the 'sign in with Twitter' button
# The main chunk of code that uses Gradio blocks to create the UI
html_button = None
with block:
gr.HTML('''
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://www.w3schools.com/w3css/4/w3.css">
''')
# todo check if user signed in
user_message = "Log in via Twitter and configure your blocking options above."
chat_history.append(["Welcome to Watchtower.", user_message])
gr.HTML(value=html_data)
with gr.Group():
with gr.Row().style(equal_height=True):
with gr.Box():
#gr.Label(value="WatchTower", visible=True, interactive=False)
url_params = gr.JSON({}, visible=False, label="URL Params").style(
)
text_input = gr.Text(label="Input", visible=False).style()
text_output = gr.Text(label="Output", visible=False).style()
html_button = twitter_auth_button = gr.HTML(
value='<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format(
get_target_website())).style(
)
with gr.Row().style(equal_height=True):
radio = gr.CheckboxGroup(value=["Violent", "Hate Speech"], choices=["Violent", "Hate Speech", "Misinformation"],
interactive=False, label="Behaviour To Block").style()
slider = gr.Slider(value=30, interactive=True, label="Threshold Confidence Tolerance")
chatbot = gr.Chatbot(label="Watchtower Output", value=get_chatbot_text()).style(color_map=["blue","grey"])
btn = gr.Button("Run WatchTower").style(full_width=True).style()
btn.click(fn=button_pressed, inputs=[slider, url_params],
outputs=[text_output, chatbot], _js=get_window_url_params)
gr.Markdown(
"""___
<p style='text-align: center'>
Created by <a href="https://twitter.com/_JamesStevenson" target="_blank"</a> James Stevenson
<br/>
</p>"""
)
# Setup callback for when page loads (used to set a new Twitter auth target webspage)
block.__enter__()
block.set_event_trigger(
event_name="load", fn=update_target_website, inputs=None, outputs=[html_button], no_target=True
)
block.set_event_trigger(
event_name="load", fn=get_chatbot_text, inputs=None, outputs=[chatbot], no_target=True
)
#block.attach_load_events()
# Launcg the page
block.launch(enable_queue = True)