WatchTower / radical_tweet_aggregator.py
User1342's picture
Update radical_tweet_aggregator.py
6421f36
raw history blame
No virus
6.87 kB
import gc
import json
import os
from datetime import date
from pathlib import Path
import time
import tweepy
from googletrans import Translator
from predictor import predictor
import unicodedata
# Twitter API keys
consumer_token = os.getenv('CONSUMER_TOKEN')
consumer_secret = os.getenv('CONSUMER_SECRET')
my_access_token = os.getenv('ACCESS_TOKEN')
my_access_secret = os.getenv('ACCESS_SECRET')
bearer = os.getenv('BEARER')
# TODO: is this needed for mapping the object type after reading the pickle files? If not remove.
class grapher():
"""
A wrapper class used for generating a graph for interactions between users
"""
graph = None
def __init__(self):
"""
Constructor.
"""
self.graph = Graph()
def add_edge_wrapper(self, node_1_name, node_2_name, weight=1, relationship=None):
"""
A wrapper function used to add an edge connection or node.
:param node_1_name: from
:param node_2_name: to
:param weight:
:param relationship:
:return:
"""
# get node one ID
node_1 = None
for node in self.graph.vs:
if node["label"] == node_1_name.capitalize():
node_1 = node
if node_1 == None:
self.graph.add_vertices(1)
node_count = self.graph.vcount()
self.graph.vs[node_count-1]["id"] = node_count-1
self.graph.vs[node_count-1]["label"] = node_1_name.capitalize()
node_1 = self.graph.vs[node_count-1]
# get node two id
node_2 = None
for node in self.graph.vs:
if node["label"] == node_2_name.capitalize():
node_2 = node
if node_2 == None:
self.graph.add_vertices(1)
node_count = self.graph.vcount()
self.graph.vs[node_count - 1]["id"] = node_count - 1
self.graph.vs[node_count - 1]["label"] = node_2_name.capitalize()
node_2 = self.graph.vs[node_count - 1]
self.graph.add_edges([(node_1["id"], node_2["id"])])
def add_node(self, node_name):
"""
A wrapper function that adds a node with no edges to the graph
:param node_name:
"""
node_1 = None
for node in self.graph.vs:
if node["label"] == node_name.capitalize():
node_1 = node["id"]
if node_1 == None:
self.graph.add_vertices(1)
node_count = self.graph.vcount()
self.graph.vs[node_count-1]["id"] = node_count-1
self.graph.vs[node_count-1]["label"] = node_name.capitalize()
node_1 = self.graph.vs[node_count-1]
# Setup Tweepy API and client objects
auth = tweepy.OAuth1UserHandler(
consumer_token, consumer_secret,
my_access_token, my_access_secret
)
api = tweepy.API(auth)
client = tweepy.Client(
bearer_token= bearer,
consumer_key=consumer_token,
consumer_secret=consumer_secret,
access_token=my_access_token,
access_token_secret=my_access_secret
)
# This class is used for streaming Tweets via Tweepy
class IDPrinter(tweepy.StreamingClient):
def on_tweet(self, tweet):
self.translator = Translator()
gc.collect()
if len(tweet.data["text"]) > 100:
if tweet and tweet.data:
if tweet.data["author_id"]:
tweet_data = tweet.data["text"].strip().replace("@", "").replace("\n","")
if tweet_data is not None or tweet != "":
username = client.get_user(id=tweet.author_id).data
# Ensure that Tweet is in English
lang = self.translator.detect(tweet_data).lang
if lang == "en":
tweet_data = unicodedata.normalize('NFKD', tweet_data).encode('ascii', 'ignore').decode()
if tweet_data != None:
# Use Pinpoint to identify if a Tweet is extremist or not
is_extremist = predictor().predict(tweet_data)
print("user {} post extremist {} - message: {}".format(username, is_extremist, str(tweet_data)))
# If a tweet is extremist go through 10 of that users posts and identify the percentage
# of posts that are extremist
if is_extremist != None and is_extremist == 1:
tweets = client.get_users_tweets(id=tweet.author_id, max_results=10)
number_extreme = 0
tweets = tweets[0]
for users_tweet in tweets:
if users_tweet.text != None:
is_extremist = predictor().predict(users_tweet.text)
if is_extremist != None:
if is_extremist == True:
number_extreme = number_extreme + 1
#print(number_extreme)
threshold = number_extreme/len(tweets[0]) * 100
#print("Threshold {}".format(threshold))
if threshold > 1: #
file_name = os.path.join("users","{}-{}-radical_users.txt".format(username,date.today().strftime("%b-%d-%Y")))
print("User {} was found to be extremist".format(username))
file_path = Path(file_name)
file_path.touch(exist_ok=True)
# Write user to a file in the user folder with the percentage of extremist posts
with open(file_name, 'w') as outfile:
json_to_dump = [{"username": username.id, "threshold": threshold,
"date": date.today().strftime("%b-%d-%Y")}]
json.dump(json_to_dump, outfile, indent=4)
print("Got user {}".format(username))
gc.collect()
# Continue indefinitely and collects Twitter posts
while True:
try:
printer = IDPrinter(bearer_token=bearer,wait_on_rate_limit =True,chunk_size=10000)
printer.add_rules(tweepy.StreamRule(value="en",tag="lang",id="lang-rule"))
printer.sample(expansions=["author_id", "geo.place_id"],threaded=False)
print("-"*20)
gc.collect()
except:
time.sleep(900)