File size: 8,733 Bytes
8158335
 
 
 
 
07768ac
8158335
0fabb50
 
 
 
c8433b9
 
0fabb50
07768ac
0fabb50
 
 
 
 
c8433b9
0fabb50
 
 
 
 
c8433b9
0fabb50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c8433b9
5e4ad05
0fabb50
 
 
 
 
 
 
090b13e
c8433b9
0fabb50
 
090b13e
0fabb50
 
8158335
 
c8433b9
 
8158335
 
 
 
 
 
 
 
 
c8433b9
 
 
 
 
 
 
 
8158335
 
c8433b9
 
 
 
 
 
 
 
 
8158335
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
07768ac
8158335
 
 
 
 
 
07768ac
8158335
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import snscrape.modules.twitter as sntwitter
import pandas as pd
import datetime as dt
from tqdm import tqdm
import requests
from scripts import sentiment

import tweepy
import configparser
import os
import pandas as pd
from datetime import datetime, date, timedelta


def get_latest_account_tweets(handle):
    try:
        if os.path.exists("tweepy_auth.ini"):
            config = configparser.ConfigParser()
            config.read("tweepy_auth.ini")
            # Get the authentication details
            authentication_section = config["AUTHENTICATION"]
            consumer_key = authentication_section["twitter_consumer_key"]
            consumer_secret = authentication_section["twitter_consumer_secret"]
            access_token = authentication_section["twitter_access_token"]
            access_token_secret = authentication_section["twitter_access_token_secret"]
        else:
            consumer_key = os.environ["twitter_consumer_key"]
            consumer_secret = os.environ["twitter_consumer_secret"]
            access_token = os.environ["twitter_access_token"]
            access_token_secret = os.environ["twitter_access_token_secret"]

        auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
        auth.set_access_token(access_token, access_token_secret)

        # create the API object
        api = tweepy.API(auth)

        # load the tweets from a specific user
        tweets = api.user_timeline(
            screen_name=handle, count=10000000, tweet_mode="extended"
        )

        df_tweets = pd.DataFrame(data=[t._json for t in tweets])
        df_tweets["created_at"] = pd.to_datetime(df_tweets["created_at"])
        df_tweets = df_tweets.sort_values("created_at")

        # print the tweet texts
        tweets_txt = []
        for tweet in tweets:
            tweets_txt.append(sentiment.tweet_cleaner(tweet.full_text))
        df_tweets["clean_text"] = tweets_txt
        df_tweets["handle"] = df_tweets.user.iloc[0]["screen_name"]

        return df_tweets

    except tweepy.TweepError as e:
        # Handle specific error conditions
        if e.api_code == 63:
            print("User has been suspended.")
        elif e.api_code == 88:
            print("Rate limit exceeded. Please try again later.")
        else:
            print("Error occurred during API call:", str(e))
        return str(e)

    except Exception as e:
        print("An error occurred:", str(e))
        return str(e)
    return None


def get_tweets(
    handle: str,
):
    """
    Fetches tweets from Twitter based on a given query and returns a list of extracted tweet information.

    Args:
        query (str): The query to search for tweets on Twitter.

    Returns:
        A list of extracted tweet information.
    """
    # Get the current date
    today = datetime.today()
    two_months_ago = today - timedelta(days=2 * 30)

    start_date = two_months_ago.strftime("%Y-%m-%d")
    end_date = today.strftime("%Y-%m-%d")

    query = f"from:{handle} since:{start_date} until:{end_date} -filter:replies -filter:retweets"

    fetched_tweets = sntwitter.TwitterSearchScraper(query).get_items()
    tweets = [extract_tweet_info(tweet) for tweet in tqdm(fetched_tweets)]
    df_tweets = pd.DataFrame(tweets)
    df_tweets["full_text"] = df_tweets["content"]
    df_tweets["clean_text"] = df_tweets["full_text"].apply(
        lambda r: sentiment.tweet_cleaner(r)
    )
    df_tweets["handle"] = df_tweets["username"]
    df_tweets["created_at"] = df_tweets["date"]
    return df_tweets


def get_replies(username: str, conversation_id: str, max_tweets: int) -> list:
    """
    Fetches the replies for a given Twitter user and conversation, and returns a list of extracted tweet information.

    Args:
        username (str): The username of the Twitter user whose replies are to be fetched.
        conversation_id (str): The ID of the conversation for which replies are to be fetched.

    Returns:
        A list of extracted tweet information for the replies.
    """
    print(
        f"Fetching replies for username {username} and conversation {conversation_id}"
    )
    query = f"to:{username} since_id:{conversation_id} filter:safe"

    tweets_list = []
    for i, tweet in tqdm(enumerate(sntwitter.TwitterSearchScraper(query).get_items())):
        if i > max_tweets:
            break
        else:
            tweets_list.append(extract_tweet_info(tweet))
    return tweets_list


def get_tweet_by_id_and_username(username: str, tweet_id: str):
    """
    Fetches a tweet from Twitter based on the given username and tweet ID.

    Args:
        username (str): The username of the Twitter user who posted the tweet.
        tweet_id (str): The ID of the tweet to fetch.

    Returns:
        The fetched tweet.
    """
    tweet_url = f"https://twitter.com/{username}/status/{tweet_id}"
    return sntwitter.TwitterSearchScraper(tweet_url).get_items()


def extract_tweet_info(tweet):
    """
    Extracts relevant information from a tweet object and returns a dictionary with the extracted values.

    Args:
        tweet: A tweet object.

    Returns:
        A dictionary with the extracted tweet information.
    """
    return {
        "date": tweet.date,
        "username": tweet.user.username,
        "content": tweet.rawContent,
        "retweet_count": tweet.retweetCount,
        "tweet_id": tweet.id,
        "like_count": tweet.likeCount,
        "reply_count": tweet.replyCount,
        "in_reply_to_tweet_id": tweet.inReplyToTweetId,
        "conversation_id": tweet.conversationId,
        "view_count": tweet.viewCount,
    }


def get_follower_ids(username: str, limit: int = 20):
    """
    Retrieves a list of Twitter IDs for users who follow a given Twitter handle.

    Args:
        username (str): The Twitter handle to retrieve follower IDs for.
        limit (int): The maximum number of follower IDs to retrieve.

    Returns:
        A list of Twitter user IDs (as strings).
    """
    # Construct the search query using snscrape
    query = f"from:{username} replies:True"

    start_date = dt.date(year=2023, month=3, day=10)
    end_date = dt.date(year=2023, month=3, day=22)
    query = f"from:{username} since:{start_date} until:{end_date}"
    tweets = get_tweets(query=query)
    one_tweet = tweets[-1]
    one_tweet_id = one_tweet["tweet_id"]

    replies = get_replies(
        username=username, conversation_id=one_tweet_id, max_tweets=1000
    )

    return one_tweet, replies


def get_twitter_account_info(twitter_handle: str) -> dict:
    """
    Extracts the name, username, follower count, and last tweet of a Twitter user using snscrape.

    Args:
        twitter_handle (str): The Twitter username to retrieve information for.

    Returns:
        dict: A dictionary containing the name, username, follower count, and last tweet of the Twitter user.
    """

    # Create a TwitterUserScraper object
    user_scraper = sntwitter.TwitterUserScraper(twitter_handle)

    # Get the user's profile information
    user_profile = user_scraper.entity
    check_string = lambda s: "false" if str(s).lower() == "false" else "true"
    return {
        "name": user_profile.displayname,
        "username": user_profile.username,
        "user_id": user_profile.id,
        "follower_count": user_profile.followersCount,
        "friends_count": user_profile.friendsCount,
        "verified": check_string(user_profile.verified),
    }


if __name__ == "__main__":
    ## Testing extracting tweets from an account
    # Set the search variables (dates for when account tweeted. Does not take into account replies)
    account = "taylorlorenz"
    start_date = dt.date(year=2023, month=2, day=1)
    end_date = dt.date(year=2023, month=3, day=11)

    # Format the query string
    query = f"from:{account} since:{start_date} until:{end_date}"
    print(f"query: {query}")
    tweets = get_tweets(query=query)

    df_tweets = pd.DataFrame(data=tweets)
    df_tweets = df_tweets.sort_values("in_reply_to_tweet_id")
    # Uncomment to save output
    df_tweets.to_csv("df_tweets.csv")

    print(df_tweets.head(2))
    print(df_tweets.tail(2))
    print(f"Total Tweets: {len(tweets)}")

    ## Testing extracting conversatin threeds from conversation Id
    conversation_id = (
        1620650202305798144  # A tweet from elon musk about turbulent times
    )
    max_tweets = 3000
    tweets = get_replies(
        username="elonmusk", conversation_id=conversation_id, max_tweets=max_tweets
    )
    df_replies = pd.DataFrame(data=tweets)

    # Uncomment to save output
    # df_replies.to_csv("df_replies.csv")
    print(
        f"Number of extracted tweets from conversation_id: {conversation_id}, {len(tweets)}"
    )