sd / ttt.py
decula
7b_rag.py
0396b0d
import tweepy
import datetime
import os
def get_tweets_from_user_list_v2(user_list, client, tweets_per_user=10):
"""
Retrieves the latest tweets from a list of Twitter users within the last 24 hours using Tweepy API v2.
"""
all_tweets = []
now = datetime.datetime.now(datetime.timezone.utc)
yesterday = now - datetime.timedelta(days=1)
for username in user_list:
try:
user = client.get_user(username=username)
if user.errors:
print(f"Error getting user ID for {username}: {user.errors}")
continue
user_id = user.data['id']
response = client.get_users_tweets(
id=user_id,
max_results=tweets_per_user,
tweet_fields=["created_at", "text"],
)
if response.errors:
print(f"Error fetching tweets for {username} (ID: {user_id}): {response.errors}")
continue
tweets = response.data
if tweets:
for tweet in tweets:
tweet_time = tweet['created_at']
if tweet_time >= yesterday:
all_tweets.append(tweet)
else:
break
except Exception as e:
print(f"Unexpected error fetching tweets for {username}: {e}")
continue
return all_tweets
def authenticate_v2():
"""Authenticates with the Twitter API v2 using a Bearer Token."""
bearer_token = (f"AAAAAAAAAAAAAAAAAAAAAKmVzAEAAAAAhVM%2BPoJytUvDoQ%2FgIGfLdJXdocU%3DvS5ZF10O60kLJcY6MGiErCDHV7awcUKB75QqWpIlKh84rK5CFw")
if not bearer_token:
print("Error: Missing Twitter Bearer Token. Please set the environment variable.")
return None
try:
client = tweepy.Client(bearer_token)
user = client.get_me() # Check authentication
if user.errors:
print(f"Authentication check failed: {user.errors}")
return None
print("Authentication successful (API v2)!")
return client
except Exception as e:
print(f"Authentication failed (API v2): {e}")
return None
if __name__ == '__main__':
client = authenticate_v2()
if client is None:
print("Exiting due to authentication failure.")
exit()
user_list = ["elonmusk", "BillGates", "NASA"]
tweets = get_tweets_from_user_list_v2(user_list, client, tweets_per_user=5)
if tweets:
print(f"Found {len(tweets)} tweets from the last 24 hours:")
for tweet in tweets:
print(f"Text: {tweet['text']}\n")
print(f"Tweeted at: {tweet['created_at']}\n")
else:
print("No tweets found in the last 24 hours or an error occurred.")