Spaces:
Runtime error
Runtime error
import gc | |
import json | |
import os | |
from datetime import date | |
from pathlib import Path | |
import unicodedata | |
consumer_token = os.getenv('CONSUMER_TOKEN') | |
consumer_secret = os.getenv('CONSUMER_SECRET') | |
my_access_token = os.getenv('ACCESS_TOKEN') | |
my_access_secret = os.getenv('ACCESS_SECRET') | |
bearer = os.getenv('BEARER') | |
import time | |
import tweepy | |
from googletrans import Translator | |
from predictor import predictor | |
class grapher(): | |
""" | |
A wrapper class used for generating a graph for interactions between users | |
""" | |
graph = None | |
def __init__(self): | |
""" | |
Constructor. | |
""" | |
self.graph = Graph() | |
def add_edge_wrapper(self, node_1_name, node_2_name, weight=1, relationship=None): | |
""" | |
A wrapper function used to add an edge connection or node. | |
:param node_1_name: from | |
:param node_2_name: to | |
:param weight: | |
:param relationship: | |
:return: | |
""" | |
# get node one ID | |
node_1 = None | |
for node in self.graph.vs: | |
if node["label"] == node_1_name.capitalize(): | |
node_1 = node | |
if node_1 == None: | |
self.graph.add_vertices(1) | |
node_count = self.graph.vcount() | |
self.graph.vs[node_count-1]["id"] = node_count-1 | |
self.graph.vs[node_count-1]["label"] = node_1_name.capitalize() | |
node_1 = self.graph.vs[node_count-1] | |
# get node two id | |
node_2 = None | |
for node in self.graph.vs: | |
if node["label"] == node_2_name.capitalize(): | |
node_2 = node | |
if node_2 == None: | |
self.graph.add_vertices(1) | |
node_count = self.graph.vcount() | |
self.graph.vs[node_count - 1]["id"] = node_count - 1 | |
self.graph.vs[node_count - 1]["label"] = node_2_name.capitalize() | |
node_2 = self.graph.vs[node_count - 1] | |
#print("User one {} - {}, user two {} - {}".format(node_1["label"], str(node_1["id"]), | |
# node_2["label"], str(node_2["id"]))) | |
self.graph.add_edges([(node_1["id"], node_2["id"])]) | |
#self.graph.add_edge(node_1_name, node_2_name, weight=weight, relation=relationship) # , attr={""} | |
def add_node(self, node_name): | |
""" | |
A wrapper function that adds a node with no edges to the graph | |
:param node_name: | |
""" | |
node_1 = None | |
for node in self.graph.vs: | |
if node["label"] == node_name.capitalize(): | |
node_1 = node["id"] | |
if node_1 == None: | |
self.graph.add_vertices(1) | |
node_count = self.graph.vcount() | |
self.graph.vs[node_count-1]["id"] = node_count-1 | |
self.graph.vs[node_count-1]["label"] = node_name.capitalize() | |
node_1 = self.graph.vs[node_count-1] | |
global_oauth1_user_handler = None | |
auth = tweepy.OAuth1UserHandler( | |
consumer_token, consumer_secret, | |
my_access_token, my_access_secret | |
) | |
api = tweepy.API(auth) | |
client = tweepy.Client( | |
bearer_token= bearer, | |
consumer_key=consumer_token, | |
consumer_secret=consumer_secret, | |
access_token=my_access_token, | |
access_token_secret=my_access_secret | |
) | |
class IDPrinter(tweepy.StreamingClient): | |
def on_tweet(self, tweet): | |
self.translator = Translator() | |
gc.collect() | |
if len(tweet.data["text"]) > 100: | |
#tweet = client.get_tweet(id=tweet.id) | |
if tweet and tweet.data: | |
if tweet.data["author_id"]: | |
tweet_data = tweet.data["text"].strip().replace("@", "").replace("\n","") | |
if tweet_data is not None or tweet != "": | |
username = client.get_user(id=tweet.author_id).data | |
lang = self.translator.detect(tweet_data).lang | |
if lang == "en": | |
tweet_data = unicodedata.normalize('NFKD', tweet_data).encode('ascii', 'ignore').decode() | |
if tweet_data != None: | |
is_extremist = predictor().predict(tweet_data) | |
print("user {} post extremist {} - message: {}".format(username, is_extremist, str(tweet_data))) | |
if is_extremist != None and is_extremist == 1: | |
tweets = client.get_users_tweets(id=tweet.author_id, max_results=10) | |
number_extreme = 0 | |
tweets = tweets[0] | |
for users_tweet in tweets: | |
if users_tweet.text != None: | |
is_extremist = predictor().predict(users_tweet.text) | |
if is_extremist != None: | |
if is_extremist == True: | |
number_extreme = number_extreme + 1 | |
print(number_extreme) | |
threshold = number_extreme/len(tweets[0]) * 100 | |
print("Threshold {}".format(threshold)) | |
if threshold > 1: # | |
file_name = os.path.join("users","{}-radical_users.txt".format(date.today().strftime("%b-%d-%Y"))) | |
print("User {} was found to be extremist".format(username)) | |
file_path = Path(file_name) | |
file_path.touch(exist_ok=True) | |
with open(file_name, 'a+') as outfile: | |
json_to_dump = [{"username":username.id,"threshold":threshold,"date":date.today().strftime("%b-%d-%Y")}] | |
json.dump(json_to_dump, outfile, indent=4) | |
print("Got user {}".format(username)) | |
gc.collect() | |
# calling the api | |
while True: | |
try: | |
printer = IDPrinter(bearer_token=bearer,wait_on_rate_limit =True,chunk_size=10000) | |
printer.add_rules(tweepy.StreamRule(value="en",tag="lang",id="lang-rule")) | |
printer.sample(expansions=["author_id", "geo.place_id"],threaded=False) | |
print("-"*20) | |
gc.collect() | |
except: | |
time.sleep(900) | |