WatchTower / app.py
User1342's picture
Update app.py
f178e64
raw history blame
No virus
14.4 kB
#!/usr/bin/env python
# coding: utf-8
# Imports
import json
import os
import time
import gradio as gr
import tweepy
# Twitter keys
consumer_token = os.getenv('CONSUMER_TOKEN')
consumer_secret = os.getenv('CONSUMER_SECRET')
my_access_token = os.getenv('ACCESS_TOKEN')
my_access_secret = os.getenv('ACCESS_SECRET')
bearer = os.getenv('BEARER')
html_data = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://www.w3schools.com/w3css/4/w3.css">
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Poppins">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<style>
body,h1,h2,h3,h4,h5 {font-family: "Poppins", sans-serif}
body {font-size: 16px;}
img {margin-bottom: -8px;}
.mySlides {display: none;}
</style>
</head>
<body class="w3-content w3-black" style="max-width:1500px;">
<!-- The App Section -->
<div class="w3-padding-64 w3-white">
<div class="w3-row-padding">
<div class="w3-col l8 m6">
<h1 class="w3-jumbo"><b>WatchTower 🐦🚧</b></h1>
<h1 class="w3-xxxlarge w3-text-orange"><b>Block Unfavorable Tweets From Your Feed </b></h1>
<p><span class="w3-xlarge">Scroll down to use WatchTower 1.0. ⬇ </span> WatchTower is a tool that identifies hate speech, misinformation, and extremist content and blocks it from your Twitter feed. WatchTower blocks content based on it's current database, so make sure to come back regullary to ensure you're up to date! WatchTower is simple to use: first scroll down the page and click the 'sign in with Twitter' button, then you'll be taken to the Twitter website and asked to verify yourself, after this you'll be taken back here, then simply scroll down to the botton of the page and click run!</p>
<a href="https://www.watchtower.cartographer.one/"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'">
<i class=""></i> Find Out More! 💬
</button></a>
<a href="https://ko-fi.com/jamesstevenson"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'">
<i class=""></i> Support The Creator! ❤
</button></a>
<a href="https://twitter.com/WATCHTOWER_WEB"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'">
<i class=""></i> Follow Us! 🐦
</button></a>
</div>
</div>
</div>
<!-- Modal -->
<div id="download" class="w3-modal w3-animate-opacity">
<div class="w3-modal-content" style="padding:32px">
<div class="w3-container w3-white">
<i onclick="document.getElementById('download').style.display='none'" class="fa fa-remove w3-xlarge w3-button w3-transparent w3-right w3-xlarge"></i>
<h2 class="w3-wide">DOWNLOAD</h2>
<p>Download the app in AppStore, Google Play or Microsoft Store.</p>
<i class="fa fa-android w3-large"></i> <i class="fa fa-apple w3-large"></i> <i class="fa fa-windows w3-large"></i>
<p><input class="w3-input w3-border" type="text" placeholder="Enter e-mail"></p>
<button type="button" class="w3-button w3-block w3-padding-large w3-red w3-margin-bottom" onclick="document.getElementById('download').style.display='none'">Fake Download</button>
</div>
</div>
</div>
<script>
// Slideshow
var slideIndex = 1;
showDivs(slideIndex);
function plusDivs(n) {
showDivs(slideIndex += n);
}
function showDivs(n) {
var i;
var x = document.getElementsByClassName("mySlides");
if (n > x.length) {slideIndex = 1}
if (n < 1) {slideIndex = x.length}
for (i = 0; i < x.length; i++) {
x[i].style.display = "none";
}
x[slideIndex-1].style.display = "block";
}
</script>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
</body>
</html>
'''
# Imports
import json
import os
import time
import gradio as gr
import tweepy
# Setup the gradio block and add some generic CSS
block = gr.Blocks(css="", title="WatchTower")
# Chat history variable used for the chatbot prompt on the 'getting started' page.
chat_history = []
def get_client_from_tokens(oauth_verifier, oauth_token):
'''
This function is used for generating a Tweepy client object based on Oauth verifier and token paramiters
:param oauth_verifier:
:param oauth_token:
:return: A Tweepy client object
'''
new_oauth1_user_handler = tweepy.OAuth1UserHandler(
consumer_token, consumer_secret,
callback="https://hf.space/embed/User1342/WatchTower/"
)
new_oauth1_user_handler.request_token = {
"oauth_token": oauth_token,
"oauth_token_secret": consumer_secret
}
access_token, access_token_secret = new_oauth1_user_handler.get_access_token(
oauth_verifier
)
their_client = tweepy.Client(
bearer_token=bearer,
consumer_key=consumer_token,
consumer_secret=consumer_secret,
access_token=access_token,
access_token_secret=access_token_secret
)
# TODO: The below is not necessary and can be removed.
global client
client = their_client
return their_client
def block_users(client, threshold, dataset):
'''
Used for blocking a series of users based on the threshold and datasets provided. Here the users folder is used.
TODO: Datasets not implemented.
:param client:
:param threshold:
:param dataset:
:return: The number of blocked users.
'''
num_users_blocked = 0
for filename in os.listdir("users"):
filename = os.path.join("users", filename)
print("File {} open".format(filename))
user_file = open(filename, "r")
users = json.load(user_file)
for user in users:
print("Reviewing user {}".format(user))
if user["threshold"] >= threshold:
user_id = str(user["username"])
finished = False
while not finished:
try:
print("preparing to block {}".format(user_id))
client.block(target_user_id=user_id)
print("User blocked")
except tweepy.errors.TooManyRequests as e:
print("time out error")
print(e)
finished = True
# time.sleep(240)
continue
except tweepy.errors.BadRequest as e:
print("bad request error")
print(e)
finished = True
continue
except:
time.sleep(240)
continue
time.sleep(1)
finished = True
me = client.get_me()
print("{} blocked {}".format(me.data["username"], user))
num_users_blocked = num_users_blocked + 1
return num_users_blocked
def chat(selected_option=None, radio_score=None, url_params=None):
'''
This function is used to initialise blocking users once the user has authenticated with Twitter.
:param selected_option:
:param radio_score:
:param url_params:
:return: the chatbot history is returned (including information on blocked accounts).
'''
global client
global chat_history
history = []
# app id
if "oauth_verifier" in url_params and "oauth_token" in url_params and client is None:
client = get_client_from_tokens(url_params["oauth_verifier"], url_params["oauth_token"])
if radio_score != None and selected_option != None:
if client != None:
history.append(
["Model tuned to a '{}%' threshold and is using the '{}' dataset.".format(radio_score, selected_option),
"{} Account blocking initialised".format(str(selected_option).capitalize())])
num_users_blocked = block_users(client, radio_score, selected_option)
history.append(
["Blocked {} user account(s).".format(num_users_blocked), "Thank you for using Watchtower."])
elif radio_score != None or selected_option != None:
chat_history.append(["Initialisation error!", "Please tune the model by using the above options"])
history = chat_history + history
chatbot.value = history
chatbot.update(value=history)
client = None
return history
def infer(prompt):
pass
have_initialised = False
client = None
name = None
def button_pressed(slider_value, url_params):
# print(url_params)
return [None, chat(radio.value, slider_value, url_params)]
# The website that the user will visit to authenticate WatchTower.
target_website = None
def update_target_website():
'''
Updates the URL used to authenticate WatchTower with Twitter.
#TODO this function is full of old code and can be optimised.
:return:
'''
global have_initialised
global chatbot
global chat_history
global client
global name
client = None
name = "no username"
chat_history = [
["Welcome to Watchtower.".format(name), "Log in via Twitter and configure your blocking options above."]]
chatbot.value = chat_history
chatbot.update(value=chat_history)
twitter_auth_button.value = '<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format(
get_target_website())
twitter_auth_button.update(
value='<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format(
get_target_website()))
return '<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format(
get_target_website())
# The below is a JS blob used to retrieve the URL params.
# Thanks to here: https://discuss.huggingface.co/t/hugging-face-and-gradio-url-paramiters/21110/2
get_window_url_params = """
function(text_input, url_params) {
console.log(text_input, url_params);
const params = new URLSearchParams(window.location.search);
url_params = Object.fromEntries(params);
return [text_input, url_params];
}
"""
def get_target_website():
'''
A wrapper function used for retrieving the URL a user will use to authenticate WatchTower with Twitter.
:return:
'''
oauth1_user_handler = tweepy.OAuth1UserHandler(
consumer_token, consumer_secret,
callback="https://hf.space/embed/User1342/WatchTower/"
)
target_website = oauth1_user_handler.get_authorization_url(signin_with_twitter=True)
return target_website
# The Gradio HTML component used for the 'sign in with Twitter' button
# The main chunk of code that uses Gradio blocks to create the UI
html_button = None
with block:
gr.HTML('''
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://www.w3schools.com/w3css/4/w3.css">
''')
# todo check if user signed in
user_message = "Log in via Twitter and configure your blocking options above."
chat_history.append(["Welcome to Watchtower.", user_message])
gr.HTML(value=html_data)
with gr.Group():
with gr.Row().style(mobile_collapse=False, equal_height=True):
gr.HTML(value=r'<img src="https://www.watchtower.cartographer.one/wp-content/uploads/2022/10/Blocking-Unfavorable-Content-1.png" alt="Markdown Monster icon" style="float: left; margin-right: 10px;" width="350"/>').style(
)
with gr.Box():
#gr.Label(value="WatchTower", visible=True, interactive=False)
url_params = gr.JSON({}, visible=False, label="URL Params").style(
)
text_input = gr.Text(label="Input", visible=False).style(
rounded=(False, True, True, False),
)
text_output = gr.Text(label="Output", visible=False).style(
rounded=(False, True, True, False),
)
html_button = twitter_auth_button = gr.HTML(
value='<a href={}><img src="https://cdn.cms-twdigitalassets.com/content/dam/developer-twitter/auth-docs/sign-in-with-twitter-gray.png.twimg.1920.png" alt="Log In With Twitter"></a><br>'.format(
get_target_website())).style(
)
with gr.Row().style(mobile_collapse=True, equal_height=True):
radio = gr.CheckboxGroup(value="Violent", choices=["Violent", "Hate Speech", "Misinformation"],
interactive=False, label="Behaviour To Block").style(
rounded=(False, True, True, False),
)
slider = gr.Slider(value=5, interactive=True, label="Threshold Confidence Tolerance")
chatbot = gr.Chatbot(value=chat_history, label="Watchtower Output").style(
rounded=(False, True, True, False),
)
btn = gr.Button("Run WatchTower").style(
margin=False,
rounded=(False, True, True, False), full_width=True
)
btn.click(fn=button_pressed, inputs=[slider, url_params],
outputs=[text_output, chatbot], _js=get_window_url_params)
gr.Markdown(
"""___
<p style='text-align: center'>
Created by <a href="https://twitter.com/_JamesStevenson" target="_blank"</a> James Stevenson
<br/>
</p>"""
)
# Setup callback for when page loads (used to set a new Twitter auth target webspage)
block.__enter__()
block.set_event_trigger(
event_name="load", fn=update_target_website, inputs=None, outputs=[html_button], no_target=True
)
# Launcg the page
block.launch(favicon_path="https://www.jamesstevenson.me/wp-content/uploads/2022/08/Untitled-design-32.png")