Spaces:
Sleeping
Sleeping
import datetime | |
import re | |
import sys | |
import time | |
import tweepy | |
from Pinpoint.ConfigManager import ConfigManager | |
class Twitter: | |
''' | |
Twitter aggregator class | |
''' | |
tweepy_api = None | |
def __init__(self): | |
''' | |
Constrcutor | |
''' | |
twitter_config = ConfigManager.getTwitterConfig() | |
consumer_key = twitter_config["consumer_key"] | |
consumer_secret = twitter_config["consumer_secret"] | |
access_token = twitter_config["access_token"] | |
access_token_secret = twitter_config["access_token_secret"] | |
auth = tweepy.OAuthHandler(consumer_key, consumer_secret) | |
auth.set_access_token(access_token, access_token_secret) | |
self.tweepy_api = tweepy.API(auth) | |
def get_tweet(self, tweet_info, attempts=1): | |
''' | |
returns a list of up to two tweets. This is because the provided tweet could be a quoted tweet. If this is the case | |
we take that as two seperate tweets. Otherwise one tweet is returned with the necessary extracted data. | |
:param tweet_info: | |
:return: a list of up to two tweets with the necessary data extracted as defined in the serilizer. | |
''' | |
# If we've received several errors in a row then it's probably not going to fix itself. | |
if attempts > 5: | |
return [] | |
list_of_tweets = [] | |
tweet = None | |
try: | |
retweets = tweet_info.retweet_count | |
likes = tweet_info.favorite_count | |
date = tweet_info.created_at.timestamp() | |
# Gets full tweet if normal tweet or re-tweet | |
if tweet_info.retweeted: | |
try: | |
tweet = tweet_info.retweeted_status.full_text | |
retweets = tweet_info.retweeted_status.retweet_count | |
likes = tweet_info.retweeted_status.favorite_count | |
tweet_info = self.tweepy_api.get_status(id=tweet_info.id, tweet_mode='extended') | |
# Gets author of tweet | |
source = tweet_info.full_text.split(":", 1)[0] | |
regex = r"RT @(.+)" | |
matchObj = re.match(regex, source) | |
if matchObj: | |
source = matchObj.group(1) | |
else: | |
source = "self" | |
except AttributeError as e: | |
print(e) | |
pass | |
else: | |
# Gets full tweet and sets author to self | |
tweet = tweet_info.full_text | |
source = "self" | |
# For quotes retweets we take the quoted tweet and the parent tweet as two seperate tweets. | |
if tweet_info.is_quote_status: | |
try: | |
quoted_id = tweet_info.quoted_status_id | |
quoted_tweet_info = self.tweepy_api.get_status(id=quoted_id, tweet_mode='extended') | |
quoted_tweet_text = quoted_tweet_info.full_text | |
quoted_source = quoted_tweet_info.user.name | |
quoted_retweets = quoted_tweet_info.retweet_count | |
quoted_likes = quoted_tweet_info.favorite_count | |
quoted_date = quoted_tweet_info.created_at.timestamp() | |
# As this function can return two tweets (i.e. a quoted tweet and normal tweet) the tweets are added to a list | |
list_of_tweets.append( | |
Serializer.createPostDict(date=quoted_date, post_text=quoted_tweet_text, likes=quoted_likes, | |
comments='', shares=quoted_retweets, source=quoted_source)) | |
except AttributeError as e: | |
print("Tweepy Twitter api error. On attempt {} \n {}".format(attempts, e)) | |
pass | |
# As this function can return two tweets (i.e. a quoted tweet and normal tweet) the tweets are added to a list | |
if tweet is not None: | |
list_of_tweets.append( | |
Serializer.createPostDict(date=date, post_text=tweet, likes=likes, comments='', shares=retweets, | |
source=source)) | |
except tweepy.RateLimitError as e: | |
print("Tweepy Twitter api rate limit reached. On attempt {} \n {}".format(attempts, e)) | |
time.sleep(300) | |
return self.get_tweet(tweet_info, attempts + 1) # if error, try again. | |
except tweepy.TweepError as e: | |
print("Tweepy Twitter api error. On attempt {} \n {}".format(attempts, e)) | |
pass | |
return list_of_tweets | |
def get_posts(self, username, attempts=1): | |
''' | |
Loops through all tweets for the provided user | |
:param username: | |
:return: a list of serilised tweets | |
''' | |
# If a participant has enteres their username with spaces in error this will format it. | |
username = username.replace(" ", "") | |
# Checks attempts. If exceeded return empty list. | |
if attempts > 3: | |
return [] | |
list_of_tweets = [] | |
# If an @ symbol has been added to the string then it's removed. | |
if str(username).startswith("@"): | |
username = username[1:] | |
try: | |
for tweet_info in tweepy.Cursor(self.tweepy_api.user_timeline, id=username, tweet_mode='extended').items(): | |
# As this function can return two tweets (i.e. a quoted tweet and normal tweet) the tweets are added to a list | |
list_of_tweets = list_of_tweets + self.get_tweet(tweet_info) | |
except tweepy.error.TweepError as e: | |
print("Tweepy Twitter api error on user {}. On Attempt {} .\n {}".format(username, attempts, e)) | |
time.sleep(300) | |
return self.get_posts(username, sys.maxsize) # Unlinkely to be an error that can be fixed by waiting | |
return list_of_tweets | |
def get_user(self, user_name): | |
""" | |
Gets a Twepy user object for a given user name | |
:param user_name: a string representation of a Twitter username | |
:return: a Tweepy user object, None if no user found | |
""" | |
user = None | |
try: | |
user = self.tweepy_api.get_user(user_name) | |
except: | |
pass | |
return user | |
def is_valid_user(self, user_name): | |
""" | |
Gets a Twepy user object for a given user name | |
:param user_name: a string representation of a Twitter username | |
:return: None if doesn't exist or suspended, user object if valid. | |
""" | |
user = None | |
try: | |
user = self.tweepy_api.get_user(user_name) | |
if user.suspended: | |
user = None | |
except: | |
pass | |
return user | |
def get_user_post_frequency(self, user_name): | |
""" | |
A utility function used to retrieve a users post frequency | |
:param user_name: | |
:return: | |
""" | |
user = self.tweepy_api.get_user(user_name) | |
created_at_time = user.created_at | |
number_of_posts = user.statuses_count | |
current_date = datetime.datetime.now() | |
elapse_time = current_date - created_at_time | |
frequency = number_of_posts / elapse_time.days | |
return frequency | |
def get_follower_following_frequency(self, user_name): | |
""" | |
A utility function used to retrieve a users follower/ following frequency | |
:param user_name: | |
:return: | |
""" | |
user = self.tweepy_api.get_user(user_name) | |
followers_count = user.followers_count | |
following_count = user.friends_count | |
ration = following_count / followers_count | |
return ration | |