WatchTower / radical_tweet_aggregator.py
User1342's picture
Update radical_tweet_aggregator.py
8cd7a79
import gc
import json
import os
from datetime import date
from pathlib import Path
import time
import tweepy
from googletrans import Translator
from predictor import predictor
import unicodedata
# Twitter API keys
consumer_token = os.getenv('CONSUMER_TOKEN')
consumer_secret = os.getenv('CONSUMER_SECRET')
my_access_token = os.getenv('ACCESS_TOKEN')
my_access_secret = os.getenv('ACCESS_SECRET')
bearer = os.getenv('BEARER')
# TODO: is this needed for mapping the object type after reading the pickle files? If not remove.
class grapher():
"""
A wrapper class used for generating a graph for interactions between users
"""
graph = None
def __init__(self):
"""
Constructor.
"""
self.graph = Graph()
def add_edge_wrapper(self, node_1_name, node_2_name, weight=1, relationship=None):
"""
A wrapper function used to add an edge connection or node.
:param node_1_name: from
:param node_2_name: to
:param weight:
:param relationship:
:return:
"""
# get node one ID
node_1 = None
for node in self.graph.vs:
if node["label"] == node_1_name.capitalize():
node_1 = node
if node_1 == None:
self.graph.add_vertices(1)
node_count = self.graph.vcount()
self.graph.vs[node_count-1]["id"] = node_count-1
self.graph.vs[node_count-1]["label"] = node_1_name.capitalize()
node_1 = self.graph.vs[node_count-1]
# get node two id
node_2 = None
for node in self.graph.vs:
if node["label"] == node_2_name.capitalize():
node_2 = node
if node_2 == None:
self.graph.add_vertices(1)
node_count = self.graph.vcount()
self.graph.vs[node_count - 1]["id"] = node_count - 1
self.graph.vs[node_count - 1]["label"] = node_2_name.capitalize()
node_2 = self.graph.vs[node_count - 1]
self.graph.add_edges([(node_1["id"], node_2["id"])])
def add_node(self, node_name):
"""
A wrapper function that adds a node with no edges to the graph
:param node_name:
"""
node_1 = None
for node in self.graph.vs:
if node["label"] == node_name.capitalize():
node_1 = node["id"]
if node_1 == None:
self.graph.add_vertices(1)
node_count = self.graph.vcount()
self.graph.vs[node_count-1]["id"] = node_count-1
self.graph.vs[node_count-1]["label"] = node_name.capitalize()
node_1 = self.graph.vs[node_count-1]
# Setup Tweepy API and client objects
auth = tweepy.OAuth1UserHandler(
consumer_token, consumer_secret,
my_access_token, my_access_secret
)
api = tweepy.API(auth)
client = tweepy.Client(
bearer_token= bearer,
consumer_key=consumer_token,
consumer_secret=consumer_secret,
access_token=my_access_token,
access_token_secret=my_access_secret
)
# This class is used for streaming Tweets via Tweepy
class IDPrinter(tweepy.StreamingClient):
def on_tweet(self, tweet):
self.translator = Translator()
gc.collect()
if len(tweet.data["text"]) > 100:
if tweet and tweet.data:
if tweet.data["author_id"]:
tweet_data = tweet.data["text"].strip().replace("@", "").replace("\n","")
if tweet_data is not None or tweet != "":
username = client.get_user(id=tweet.author_id).data
# Ensure that Tweet is in English
lang = self.translator.detect(tweet_data).lang
if lang == "en":
tweet_data = unicodedata.normalize('NFKD', tweet_data).encode('ascii', 'ignore').decode()
if tweet_data != None:
# Use Pinpoint to identify if a Tweet is extremist or not
is_extremist = predictor().predict(tweet_data)
toxicity_score = Detoxify('original').predict(tweet_data)["toxicity"]
print("user {} post extremist {}, toxicity {} - message: {}".format(username, is_extremist, toxicity_score, str(tweet_data)))
# If a tweet is extremist or toxicity above 0.5, go through 200 of that users posts and identify the percentage
# of posts that are extremist
if (is_extremist != None and is_extremist == 1) or (toxicity_score != None and toxicity_score >= 0.5):
tweets = client.get_users_tweets(id=tweet.author_id, max_results=100)
number_extreme = 0
tweets = tweets[0]
list_of_toxicity_scores = []
for users_tweet in tweets:
if users_tweet.text != None:
tweet_msg = users_tweet.text
is_extremist = predictor().predict(tweet_msg)
toxicity_score = Detoxify('original').predict(tweet_msg)["toxicity"]
list_of_toxicity_scores.append(toxicity_score)
if is_extremist != None:
if is_extremist == True:
number_extreme = number_extreme + 1
#print(number_extreme)
threshold = number_extreme/len(tweets)
threshold = threshold * 100
toxicity_avg = sum(list_of_toxicity_scores) / len(tweets)
toxicity_avg = toxicity_avg * 100
#print("Threshold {}".format(threshold))
if threshold > 1 or toxicity_avg > 1: #
file_name = os.path.join("users","{}-{}-radical_users.txt".format(username,date.today().strftime("%b-%d-%Y")))
file_path = Path(file_name)
file_path.touch(exist_ok=True)
# Write user to a file in the user folder with the percentage of extremist posts
with open(file_name, 'w') as outfile:
json_to_dump = [{"username": username.id, "violence-threshold": threshold, "toxicity-threshold":toxicity_avg,
"date": date.today().strftime("%b-%d-%Y")}]
json.dump(json_to_dump, outfile, indent=4)
print("Got user {}".format(username))
gc.collect()
# Continue indefinitely and collects Twitter posts
while True:
try:
printer = IDPrinter(bearer_token=bearer,wait_on_rate_limit =True,chunk_size=10000)
printer.add_rules(tweepy.StreamRule(value="en",tag="lang",id="lang-rule"))
printer.sample(expansions=["author_id", "geo.place_id"],threaded=False)
print("-"*20)
gc.collect()
except:
time.sleep(900)