Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Twitter Content Analyzer | |
A comprehensive Twitter data collection and analysis tool with automated scheduling capabilities. | |
""" | |
import os | |
import logging | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Optional, Tuple, Any | |
from collections import Counter | |
import streamlit as st | |
import pandas as pd | |
import plotly.express as px | |
import pytz | |
from pymongo import MongoClient | |
import google.generativeai as genai | |
from apify_client import ApifyClient | |
from dotenv import load_dotenv | |
# ============================================================================= | |
# CONSTANTS | |
# ============================================================================= | |
DEFAULT_USERNAME = "narendramodi" | |
DEFAULT_DAYS_BACK = 7 | |
IST_TIMEZONE = 'Asia/Kolkata' | |
UTC_TIMEZONE = 'UTC' | |
# Twitter API date format | |
TWITTER_DATE_FORMAT = "%a %b %d %H:%M:%S %z %Y" | |
# MongoDB collection names | |
TWEETS_COLLECTION = "tweets" | |
SCHEDULER_USERS_COLLECTION = "scheduler_users" | |
# Streamlit page config | |
PAGE_CONFIG = { | |
"page_title": "Twitter Scraper & Analyzer", | |
"page_icon": "π¦", | |
"layout": "wide", | |
"initial_sidebar_state": "expanded" | |
} | |
# ============================================================================= | |
# LOGGING CONFIGURATION | |
# ============================================================================= | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# ============================================================================= | |
# UTILITY FUNCTIONS | |
# ============================================================================= | |
def convert_to_ist(utc_dt: datetime) -> datetime: | |
"""Convert UTC datetime to Indian Standard Time.""" | |
if utc_dt.tzinfo is None: | |
utc_dt = pytz.utc.localize(utc_dt) | |
return utc_dt.astimezone(pytz.timezone(IST_TIMEZONE)) | |
def safe_get_nested(data: Dict, keys: List[str], default=None): | |
"""Safely get nested dictionary values.""" | |
for key in keys: | |
if isinstance(data, dict) and key in data: | |
data = data[key] | |
else: | |
return default | |
return data | |
def format_large_number(num: int) -> str: | |
"""Format large numbers with commas.""" | |
return f"{num:,}" if num > 0 else "N/A" | |
# ============================================================================= | |
# CONFIGURATION MANAGEMENT | |
# ============================================================================= | |
class AppConfig: | |
"""Centralized configuration management.""" | |
def __init__(self, env_path: str = ".env.local"): | |
load_dotenv(dotenv_path=env_path) | |
self._validate_config() | |
def mongodb_uri(self) -> Optional[str]: | |
return os.getenv("MONGODB_URI") | |
def apify_api_key(self) -> Optional[str]: | |
return os.getenv("APIFY_API_KEY") | |
def gemini_api_key(self) -> Optional[str]: | |
return os.getenv("GEMINI_API_KEY") | |
def _validate_config(self) -> None: | |
"""Validate essential configuration.""" | |
if not self.apify_api_key: | |
raise ValueError("APIFY_API_KEY is required but not found in environment variables") | |
# ============================================================================= | |
# DATABASE MANAGEMENT | |
# ============================================================================= | |
class DatabaseManager: | |
"""Handles all MongoDB operations.""" | |
def __init__(self, uri: Optional[str]): | |
self.client = None | |
self.db = None | |
self.is_connected = False | |
self._connect(uri) | |
def _connect(self, uri: Optional[str]) -> None: | |
"""Establish MongoDB connection.""" | |
if not uri: | |
logger.warning("No MongoDB URI provided. Running in offline mode.") | |
self._setup_dummy_collections() | |
return | |
try: | |
self.client = MongoClient(uri, serverSelectionTimeoutMS=5000) | |
self.client.admin.command('ping') | |
self.db = self.client["DataCollector"] | |
self.tweets_collection = self.db[TWEETS_COLLECTION] | |
self.scheduler_users_collection = self.db[SCHEDULER_USERS_COLLECTION] | |
self.is_connected = True | |
logger.info("β MongoDB connected successfully") | |
except Exception as e: | |
logger.error(f"β οΈ MongoDB connection failed: {e}") | |
logger.info("π Running in offline mode - data will not be stored") | |
self._setup_dummy_collections() | |
def _setup_dummy_collections(self) -> None: | |
"""Setup dummy collections for offline mode.""" | |
class DummyCollection: | |
def update_one(self, *args, **kwargs): pass | |
def find(self, *args, **kwargs): return [] | |
def find_one(self, *args, **kwargs): return None | |
def insert_one(self, *args, **kwargs): pass | |
self.tweets_collection = DummyCollection() | |
self.scheduler_users_collection = DummyCollection() | |
self.is_connected = False | |
# ============================================================================= | |
# API SERVICES | |
# ============================================================================= | |
class ApifyService: | |
"""Handles Apify API interactions for Twitter data collection.""" | |
ACTOR_ID = "CJdippxWmn9uRfooo" | |
def __init__(self, api_key: str): | |
self.client = ApifyClient(api_key) | |
def _run_actor(self, run_input: Dict[str, Any]) -> Tuple[List[Dict], str]: | |
"""Execute Apify actor and retrieve dataset.""" | |
try: | |
run = self.client.actor(self.ACTOR_ID).call(run_input=run_input) | |
dataset_id = run["defaultDatasetId"] | |
data = list(self.client.dataset(dataset_id).iterate_items()) | |
return data, dataset_id | |
except Exception as e: | |
logger.error(f"Apify actor execution failed: {e}") | |
raise | |
def fetch_account_tweets(self, username: str, since: str, until: str) -> Tuple[List[Dict], str]: | |
"""Fetch tweets posted by a specific account.""" | |
# Handle both simple date (YYYY-MM-DD) and full timestamp (YYYY-MM-DD_HH:MM:SS) formats | |
since_formatted = f"{since}_UTC" if "_" in since else f"{since}_00:00:00_UTC" | |
until_formatted = f"{until}_UTC" if "_" in until else f"{until}_23:59:59_UTC" | |
run_input = { | |
"from": username.strip(), | |
"since": since_formatted, | |
"until": until_formatted, | |
"queryType": "Latest", | |
"include:nativeretweets": True, | |
} | |
with st.spinner(f"Fetching tweets for @{username} from {since} to {until}..."): | |
data, dataset_id = self._run_actor(run_input) | |
st.info(f"π Query Details: from:{username} | Raw results: {len(data)} tweets") | |
return data, dataset_id | |
def fetch_account_comments(self, username: str, since: str, until: str) -> Tuple[List[Dict], str]: | |
"""Fetch comments/replies directed to a specific account.""" | |
# Handle both simple date (YYYY-MM-DD) and full timestamp (YYYY-MM-DD_HH:MM:SS) formats | |
since_formatted = f"{since}_UTC" if "_" in since else f"{since}_00:00:00_UTC" | |
until_formatted = f"{until}_UTC" if "_" in until else f"{until}_23:59:59_UTC" | |
run_input = { | |
"to": username.strip(), | |
"since": since_formatted, | |
"until": until_formatted, | |
"queryType": "Latest", | |
} | |
with st.spinner(f"Fetching comments for @{username} from {since} to {until}..."): | |
data, dataset_id = self._run_actor(run_input) | |
st.info(f"π Query Details: to:@{username} | Raw results: {len(data)} comments") | |
return data, dataset_id | |
class GeminiService: | |
"""Handles Google Generative AI interactions.""" | |
def __init__(self, api_key: str): | |
genai.configure(api_key=api_key) | |
self.model = genai.GenerativeModel('gemini-1.5-flash') | |
def generate_analysis(self, tweets_df: pd.DataFrame, context: str) -> str: | |
"""Generate AI-powered analysis of tweets.""" | |
if tweets_df.empty: | |
return "No tweets provided for analysis." | |
with st.spinner("Generating AI summary with Gemini..."): | |
try: | |
tweets_text = self._format_tweets_for_analysis(tweets_df) | |
prompt = self._create_analysis_prompt(context, tweets_text) | |
response = self.model.generate_content(prompt) | |
return response.text | |
except Exception as e: | |
logger.error(f"Gemini analysis failed: {e}") | |
return f"Error generating summary: {str(e)}" | |
def _format_tweets_for_analysis(self, tweets_df: pd.DataFrame) -> str: | |
"""Format tweets for AI analysis.""" | |
return "\n\n".join([ | |
f"{i}. @{row.Username}: {row.Text} (Likes: {row.Likes}, Retweets: {row.Retweets})" | |
for i, row in enumerate(tweets_df.itertuples(), 1) | |
]) | |
def _create_analysis_prompt(self, context: str, tweets_text: str) -> str: | |
"""Create analysis prompt for Gemini.""" | |
return f""" | |
{context} | |
Here are the tweets to analyze: | |
{tweets_text} | |
Please provide a comprehensive analysis covering: | |
1. **Main Themes & Topics:** What are the key subjects of discussion? | |
2. **Overall Sentiment:** What is the general tone (positive, negative, neutral)? | |
3. **Key Insights & Patterns:** Are there any notable trends or surprising findings? | |
4. **Top Recommendations:** Provide 5 actionable suggestions for the brand/party to improve their strategy based on this feedback. | |
Format the response clearly using Markdown. | |
""" | |
# ============================================================================= | |
# DATA PROCESSING | |
# ============================================================================= | |
class TweetDataProcessor: | |
"""Processes raw tweet data into structured format.""" | |
def process_tweets(self, raw_data: List[Dict[str, Any]], target_username: str = None) -> Tuple[pd.DataFrame, Dict[str, Any]]: | |
"""Transform raw API data into clean DataFrame and metrics.""" | |
processed_data = [] | |
hashtags_counter = Counter() | |
mentions_counter = Counter() | |
all_author_data = [] | |
skipped_count = 0 | |
error_count = 0 | |
for item in raw_data: | |
try: | |
processed_tweet = self._process_single_tweet(item, hashtags_counter, mentions_counter, all_author_data, target_username) | |
if processed_tweet: | |
processed_data.append(processed_tweet) | |
else: | |
skipped_count += 1 | |
except Exception as e: | |
error_count += 1 | |
# Only log individual errors in debug mode | |
if st.session_state.get('debug_mode', False): | |
logger.warning(f"Skipping tweet due to processing error: {e}") | |
st.warning(f"Skipping a tweet due to processing error: {e}") | |
# Show summary of skipped items only if significant | |
if skipped_count > 0 and st.session_state.get('debug_mode', False): | |
st.info(f"βΉοΈ Skipped {skipped_count} items (likely mock/invalid data)") | |
if error_count > 0: | |
st.warning(f"β οΈ {error_count} items had processing errors") | |
# Extract best account details | |
account_details = self._extract_best_account_details(all_author_data, target_username) | |
# Create DataFrame and calculate engagement metrics from tweet data | |
df = pd.DataFrame(processed_data) | |
engagement_metrics = self._calculate_engagement_metrics(df, target_username) | |
# Add engagement metrics to account_details | |
if account_details: | |
account_details.update(engagement_metrics) | |
metrics = { | |
"top_hashtags": hashtags_counter.most_common(5), | |
"top_mentions": mentions_counter.most_common(5), | |
"account_details": account_details | |
} | |
return df, metrics | |
def _calculate_engagement_metrics(self, df: pd.DataFrame, target_username: str = None) -> Dict: | |
"""Calculate comprehensive engagement metrics from tweet data.""" | |
if df.empty: | |
return self._get_empty_metrics() | |
# Filter to only tweets from the target user if specified | |
if target_username: | |
user_tweets = df[df['Username'].str.lower() == target_username.lower()] | |
else: | |
user_tweets = df | |
if user_tweets.empty: | |
return self._get_empty_metrics() | |
# Basic engagement totals | |
likes_count = user_tweets['Likes'].sum() if 'Likes' in user_tweets.columns else 0 | |
views_count = user_tweets['Views'].sum() if 'Views' in user_tweets.columns else 0 | |
reply_count = user_tweets['Replies'].sum() if 'Replies' in user_tweets.columns else 0 | |
repost_count = user_tweets['Retweets'].sum() if 'Retweets' in user_tweets.columns else 0 | |
tweet_count = len(user_tweets) | |
# Content quality metrics | |
avg_likes_per_tweet = likes_count / tweet_count if tweet_count > 0 else 0 | |
avg_views_per_tweet = views_count / tweet_count if tweet_count > 0 else 0 | |
avg_engagement_rate = ((likes_count + repost_count) / views_count * 100) if views_count > 0 else 0 | |
# Content length analysis | |
if 'Text' in user_tweets.columns: | |
text_lengths = user_tweets['Text'].astype(str).str.len() | |
avg_tweet_length = text_lengths.mean() | |
longest_tweet_length = text_lengths.max() | |
shortest_tweet_length = text_lengths.min() | |
else: | |
avg_tweet_length = longest_tweet_length = shortest_tweet_length = 0 | |
# Media usage metrics | |
if 'Has_Media' in user_tweets.columns: | |
tweets_with_media = user_tweets['Has_Media'].sum() | |
media_usage_percentage = (tweets_with_media / tweet_count * 100) if tweet_count > 0 else 0 | |
# Media effectiveness | |
media_tweets = user_tweets[user_tweets['Has_Media'] == True] | |
no_media_tweets = user_tweets[user_tweets['Has_Media'] == False] | |
avg_likes_with_media = media_tweets['Likes'].mean() if len(media_tweets) > 0 else 0 | |
avg_likes_without_media = no_media_tweets['Likes'].mean() if len(no_media_tweets) > 0 else 0 | |
else: | |
tweets_with_media = media_usage_percentage = 0 | |
avg_likes_with_media = avg_likes_without_media = 0 | |
# Hashtag and mention analysis | |
if 'Hashtags' in user_tweets.columns: | |
# Count hashtags from the Hashtags field (comma-separated string) | |
hashtag_counts = user_tweets['Hashtags'].astype(str).apply(lambda x: len([h.strip() for h in x.split(',') if h.strip()])) | |
total_hashtags_used = hashtag_counts.sum() | |
avg_hashtags_per_tweet = hashtag_counts.mean() | |
tweets_with_hashtags_percentage = ((hashtag_counts > 0).sum() / tweet_count * 100) if tweet_count > 0 else 0 | |
elif 'Hashtag_Count' in user_tweets.columns: | |
# Fallback to Hashtag_Count if available | |
total_hashtags_used = user_tweets['Hashtag_Count'].sum() | |
avg_hashtags_per_tweet = user_tweets['Hashtag_Count'].mean() | |
tweets_with_hashtags_percentage = ((user_tweets['Hashtag_Count'] > 0).sum() / tweet_count * 100) if tweet_count > 0 else 0 | |
else: | |
total_hashtags_used = avg_hashtags_per_tweet = tweets_with_hashtags_percentage = 0 | |
if 'Mentions' in user_tweets.columns: | |
# Count mentions from the Mentions field (comma-separated string) | |
mention_counts = user_tweets['Mentions'].astype(str).apply(lambda x: len([m.strip() for m in x.split(',') if m.strip()])) | |
total_mentions_used = mention_counts.sum() | |
avg_mentions_per_tweet = mention_counts.mean() | |
elif 'Mention_Count' in user_tweets.columns: | |
# Fallback to Mention_Count if available | |
total_mentions_used = user_tweets['Mention_Count'].sum() | |
avg_mentions_per_tweet = user_tweets['Mention_Count'].mean() | |
else: | |
total_mentions_used = avg_mentions_per_tweet = 0 | |
# Timing and activity patterns | |
if 'Hour' in user_tweets.columns: | |
most_active_hour = user_tweets['Hour'].mode().values[0] if len(user_tweets['Hour'].mode()) > 0 else 0 | |
hourly_distribution = user_tweets['Hour'].value_counts().head(3).to_dict() | |
else: | |
most_active_hour = 0 | |
hourly_distribution = {} | |
if 'Day_of_Week' in user_tweets.columns: | |
most_active_day = user_tweets['Day_of_Week'].mode().values[0] if len(user_tweets['Day_of_Week'].mode()) > 0 else "Unknown" | |
else: | |
most_active_day = "Unknown" | |
# Performance metrics | |
if 'Likes' in user_tweets.columns and not user_tweets.empty: | |
highest_likes = user_tweets['Likes'].max() | |
top_tweet_idx = user_tweets['Likes'].idxmax() | |
top_tweet_text = user_tweets.loc[top_tweet_idx, 'Text'][:100] + "..." if 'Text' in user_tweets.columns else "" | |
top_tweet_url = user_tweets.loc[top_tweet_idx, 'URL'] if 'URL' in user_tweets.columns else "" | |
# Viral content (top 10% threshold) | |
viral_threshold = user_tweets['Likes'].quantile(0.9) | |
viral_tweets_count = (user_tweets['Likes'] > viral_threshold).sum() | |
viral_content_percentage = (viral_tweets_count / tweet_count * 100) if tweet_count > 0 else 0 | |
else: | |
highest_likes = viral_tweets_count = viral_content_percentage = 0 | |
top_tweet_text = top_tweet_url = "" | |
# Audience engagement ratios | |
like_to_view_ratio = (likes_count / views_count * 100) if views_count > 0 else 0 | |
retweet_to_like_ratio = (repost_count / likes_count * 100) if likes_count > 0 else 0 | |
reply_to_like_ratio = (reply_count / likes_count * 100) if likes_count > 0 else 0 | |
# Engagement score (weighted: likes=1, retweets=2, replies=3) | |
total_engagement = likes_count + repost_count + reply_count | |
engagement_score = (likes_count * 1 + repost_count * 2 + reply_count * 3) / tweet_count if tweet_count > 0 else 0 | |
return { | |
# Basic metrics | |
"likes_count": int(likes_count), | |
"views_count": int(views_count), | |
"reply_count": int(reply_count), | |
"repost_count": int(repost_count), | |
# Content quality metrics | |
"avg_likes_per_tweet": round(avg_likes_per_tweet, 1), | |
"avg_views_per_tweet": round(avg_views_per_tweet, 1), | |
"avg_engagement_rate": round(avg_engagement_rate, 2), | |
"avg_tweet_length": round(avg_tweet_length, 1), | |
"longest_tweet_length": int(longest_tweet_length), | |
"shortest_tweet_length": int(shortest_tweet_length), | |
# Media usage metrics | |
"tweets_with_media_count": int(tweets_with_media), | |
"media_usage_percentage": round(media_usage_percentage, 1), | |
"avg_likes_with_media": round(avg_likes_with_media, 1), | |
"avg_likes_without_media": round(avg_likes_without_media, 1), | |
# Hashtag and mention metrics | |
"total_hashtags_used": int(total_hashtags_used), | |
"avg_hashtags_per_tweet": round(avg_hashtags_per_tweet, 1), | |
"tweets_with_hashtags_percentage": round(tweets_with_hashtags_percentage, 1), | |
"total_mentions_used": int(total_mentions_used), | |
"avg_mentions_per_tweet": round(avg_mentions_per_tweet, 1), | |
# Activity patterns | |
"most_active_hour": int(most_active_hour), | |
"most_active_day": str(most_active_day), | |
"top_activity_hours": list(hourly_distribution.keys())[:3], | |
# Performance metrics | |
"highest_likes": int(highest_likes), | |
"top_tweet_text": str(top_tweet_text), | |
"top_tweet_url": str(top_tweet_url), | |
"viral_tweets_count": int(viral_tweets_count), | |
"viral_content_percentage": round(viral_content_percentage, 1), | |
# Engagement ratios | |
"like_to_view_ratio": round(like_to_view_ratio, 2), | |
"retweet_to_like_ratio": round(retweet_to_like_ratio, 2), | |
"reply_to_like_ratio": round(reply_to_like_ratio, 2), | |
"engagement_score": round(engagement_score, 1), | |
"total_engagement": int(total_engagement), | |
} | |
def _get_empty_metrics(self) -> Dict: | |
"""Return empty metrics structure.""" | |
return { | |
# Basic metrics | |
"likes_count": 0, "views_count": 0, "reply_count": 0, "repost_count": 0, | |
# Content quality metrics | |
"avg_likes_per_tweet": 0, "avg_views_per_tweet": 0, "avg_engagement_rate": 0, | |
"avg_tweet_length": 0, "longest_tweet_length": 0, "shortest_tweet_length": 0, | |
# Media usage metrics | |
"tweets_with_media_count": 0, "media_usage_percentage": 0, | |
"avg_likes_with_media": 0, "avg_likes_without_media": 0, | |
# Hashtag and mention metrics | |
"total_hashtags_used": 0, "avg_hashtags_per_tweet": 0, "tweets_with_hashtags_percentage": 0, | |
"total_mentions_used": 0, "avg_mentions_per_tweet": 0, | |
# Activity patterns | |
"most_active_hour": 0, "most_active_day": "Unknown", "top_activity_hours": [], | |
# Performance metrics | |
"highest_likes": 0, "top_tweet_text": "", "top_tweet_url": "", | |
"viral_tweets_count": 0, "viral_content_percentage": 0, | |
# Engagement ratios | |
"like_to_view_ratio": 0, "retweet_to_like_ratio": 0, "reply_to_like_ratio": 0, | |
"engagement_score": 0, "total_engagement": 0, | |
} | |
def _is_mock_tweet(self, item: Dict) -> bool: | |
"""Detect if a tweet is mock/invalid data that should be ignored.""" | |
# Check for missing essential fields that real tweets should have | |
essential_fields = ['createdAt', 'text', 'author'] | |
missing_fields = sum(1 for field in essential_fields if not item.get(field)) | |
# If missing multiple essential fields, likely mock data | |
if missing_fields >= 2: | |
return True | |
# Check for empty or placeholder text | |
text = item.get("text", "").strip() | |
if not text or text.lower() in ["", "null", "undefined", "test", "placeholder"]: | |
return True | |
# Check for missing or empty author data | |
author = item.get("author", {}) | |
if not author or not author.get("userName", "").strip(): | |
return True | |
# Check for obviously fake/test usernames | |
username = author.get("userName", "").lower() | |
test_patterns = ["test", "mock", "fake", "placeholder", "example"] | |
if any(pattern in username for pattern in test_patterns): | |
return True | |
return False | |
def _process_single_tweet(self, item: Dict, hashtags_counter: Counter, | |
mentions_counter: Counter, all_author_data: List, target_username: str = None) -> Optional[Dict]: | |
"""Process a single tweet item.""" | |
# Extract author data | |
author = item.get("author", {}) | |
if author: | |
# Only collect author data from the target user if target_username is specified | |
# This prevents random accounts from being saved in replies data | |
if target_username: | |
author_username = author.get("userName", "").lower() | |
if author_username == target_username.lower(): | |
all_author_data.append(author) | |
else: | |
all_author_data.append(author) | |
# Check if this is a mock/invalid tweet (has minimal or no real data) | |
is_mock_tweet = self._is_mock_tweet(item) | |
# Validate date information | |
created_at = item.get("createdAt", "") | |
if not created_at: | |
# Only show warning for real tweets missing dates, and only in debug mode | |
if not is_mock_tweet and st.session_state.get('debug_mode', False): | |
st.warning("Skipping a tweet due to missing date information") | |
return None | |
# Parse date | |
try: | |
date_obj_utc = datetime.strptime(created_at, TWITTER_DATE_FORMAT) | |
date_obj_ist = convert_to_ist(date_obj_utc) | |
except ValueError as e: | |
# Only log/warn for real tweets with invalid dates | |
if not is_mock_tweet: | |
if st.session_state.get('debug_mode', False): | |
st.warning(f"Skipping tweet due to invalid date format: {created_at}") | |
logger.warning(f"Invalid date format: {created_at}") | |
return None | |
# Extract text and analyze | |
text = item.get("text", "") | |
hashtags = [word.strip("#") for word in text.split() if word.startswith('#')] | |
mentions = [word.strip("@") for word in text.split() if word.startswith('@')] | |
# Update counters | |
hashtags_counter.update(hashtags) | |
mentions_counter.update(mentions) | |
return { | |
"Date": date_obj_ist.strftime("%Y-%m-%d %H:%M:%S"), | |
"Date_Only": date_obj_ist.strftime("%Y-%m-%d"), | |
"Hour": date_obj_ist.hour, | |
"Day_of_Week": date_obj_ist.strftime("%A"), | |
"Username": author.get("userName", ""), | |
"Text": text, | |
"Likes": item.get("likeCount", 0), | |
"Retweets": item.get("retweetCount", 0), | |
"Replies": item.get("replyCount", 0), | |
"Views": item.get("viewCount", 0), | |
"URL": item.get("url", ""), | |
"Has_Media": "extendedEntities" in item, | |
"Hashtags": ", ".join(hashtags), | |
"Mentions": ", ".join(mentions), | |
} | |
def _extract_best_account_details(self, all_author_data: List[Dict], target_username: str = None) -> Dict: | |
"""Extract the most complete account details from author data.""" | |
if not all_author_data: | |
# If no author data and we have a target username, create a basic structure | |
if target_username: | |
return { | |
"name": target_username, | |
"username": target_username, | |
"bio": "", | |
"followers_count": 0, | |
"following_count": 0, | |
"tweet_count": 0, | |
"verified": False, | |
"profile_image_url": "" | |
} | |
return {} | |
# Find the author data with the most complete information | |
best_author = self._find_most_complete_author(all_author_data) | |
# Debug information | |
if st.session_state.get('debug_mode', False): | |
st.write("Debug - Found", len(all_author_data), "author objects") | |
st.write("Debug - Best author data keys:", list(best_author.keys())) | |
st.write("Debug - Best author data sample:", { | |
k: v for k, v in best_author.items() | |
if k in ['name', 'userName', 'followers', 'following', 'statusesCount'] | |
}) | |
return self._standardize_account_details(best_author) | |
def _find_most_complete_author(self, all_author_data: List[Dict]) -> Dict: | |
"""Find the author data object with the most complete information.""" | |
best_author = {} | |
best_score = -1 | |
for author in all_author_data: | |
score = self._calculate_author_completeness_score(author) | |
if score > best_score: | |
best_score = score | |
best_author = author | |
return best_author if best_score > 0 else (all_author_data[0] if all_author_data else {}) | |
def _calculate_author_completeness_score(self, author: Dict) -> int: | |
"""Calculate completeness score for author data.""" | |
score = 0 | |
# Check for follower metrics (high priority) | |
followers = (author.get("followers") or author.get("followersCount") or | |
author.get("followers_count") or | |
author.get("publicMetrics", {}).get("followers_count") or | |
safe_get_nested(author, ["publicMetrics", "followers_count"]) or | |
safe_get_nested(author, ["public_metrics", "followers_count"]) or 0) | |
if followers > 0: | |
score += 3 | |
following = (author.get("following") or author.get("followingCount") or | |
author.get("following_count") or author.get("friends_count") or | |
author.get("publicMetrics", {}).get("following_count") or | |
safe_get_nested(author, ["publicMetrics", "following_count"]) or | |
safe_get_nested(author, ["public_metrics", "following_count"]) or 0) | |
if following > 0: | |
score += 2 | |
tweet_count = (author.get("statusesCount") or author.get("statuses_count") or | |
author.get("tweet_count") or | |
author.get("publicMetrics", {}).get("tweet_count") or | |
safe_get_nested(author, ["publicMetrics", "tweet_count"]) or | |
safe_get_nested(author, ["public_metrics", "tweet_count"]) or 0) | |
if tweet_count > 0: | |
score += 2 | |
# Check for profile information (lower priority) | |
if author.get("description") or author.get("profile_bio"): | |
score += 1 | |
if author.get("verified") or author.get("isVerified"): | |
score += 1 | |
return score | |
def _convert_to_ist_format(self, twitter_date_str: str) -> str: | |
"""Convert Twitter date string to IST format.""" | |
if not twitter_date_str or twitter_date_str == "": | |
return "" | |
try: | |
# Parse the Twitter date format: "Mon Jul 08 09:31:59 +0000 2013" | |
utc_dt = datetime.strptime(twitter_date_str, TWITTER_DATE_FORMAT) | |
# Convert to IST | |
ist_tz = pytz.timezone(IST_TIMEZONE) | |
ist_dt = utc_dt.astimezone(ist_tz) | |
# Format as a more readable IST date | |
# Format: "8 July 2013, 3:01 PM IST" | |
formatted_date = ist_dt.strftime("%d %B %Y, %I:%M %p IST") | |
return formatted_date | |
except ValueError: | |
# If parsing fails, return the original string | |
return twitter_date_str | |
def _standardize_account_details(self, author_data: Dict) -> Dict: | |
"""Standardize account details from various possible field names.""" | |
# Debug: Print raw author data keys (only in debug mode) | |
if st.session_state.get('debug_mode', False): | |
st.write(f"Debug - Author data keys: {list(author_data.keys())}") | |
# Try multiple possible field names for metrics with additional variations | |
followers_count = ( | |
author_data.get("followers") or | |
author_data.get("followersCount") or | |
author_data.get("followers_count") or | |
author_data.get("publicMetrics", {}).get("followers_count") or | |
safe_get_nested(author_data, ["publicMetrics", "followers_count"]) or | |
safe_get_nested(author_data, ["public_metrics", "followers_count"]) or | |
0 | |
) | |
following_count = ( | |
author_data.get("following") or | |
author_data.get("followingCount") or | |
author_data.get("following_count") or | |
author_data.get("friends_count") or | |
author_data.get("publicMetrics", {}).get("following_count") or | |
safe_get_nested(author_data, ["publicMetrics", "following_count"]) or | |
safe_get_nested(author_data, ["public_metrics", "following_count"]) or | |
0 | |
) | |
tweet_count = ( | |
author_data.get("statusesCount") or | |
author_data.get("statuses_count") or | |
author_data.get("tweet_count") or | |
author_data.get("publicMetrics", {}).get("tweet_count") or | |
safe_get_nested(author_data, ["publicMetrics", "tweet_count"]) or | |
safe_get_nested(author_data, ["public_metrics", "tweet_count"]) or | |
0 | |
) | |
# Extract account creation date | |
raw_create_date = ( | |
author_data.get("createdAt") or | |
author_data.get("created_at") or | |
author_data.get("account_create_date") or | |
"" | |
) | |
# Convert to IST format if we have a valid date | |
account_create_date = self._convert_to_ist_format(raw_create_date) | |
return { | |
"name": author_data.get("name", ""), | |
"username": author_data.get("userName", "") or author_data.get("username", ""), | |
"bio": author_data.get("description", "") or author_data.get("bio", ""), | |
"followers_count": followers_count, | |
"following_count": following_count, | |
"tweet_count": tweet_count, | |
"verified": author_data.get("verified", False) or author_data.get("isVerified", False), | |
"profile_image_url": author_data.get("profileImageUrl", "") or author_data.get("profile_image_url", ""), | |
"account_create_date": account_create_date, | |
# Engagement metrics will be calculated from tweet data and added later | |
"likes_count": 0, | |
"views_count": 0, | |
"reply_count": 0, | |
"repost_count": 0, | |
} | |
# ============================================================================= | |
# UI COMPONENTS | |
# ============================================================================= | |
class UIComponents: | |
"""Reusable UI components for the dashboard.""" | |
def display_account_info(account_details: Dict) -> None: | |
"""Display account information section.""" | |
if not account_details: | |
return | |
st.subheader(f"π€ Account: @{account_details['username']}") | |
# Profile image | |
if account_details.get('profile_image_url'): | |
st.image(account_details['profile_image_url'], width=80) | |
# Account name and verification | |
verification_badge = 'β ' if account_details.get('verified') else '' | |
st.markdown(f"**{account_details.get('name')}** {verification_badge}") | |
# Bio | |
if account_details.get('bio'): | |
st.caption(account_details.get('bio')) | |
# Metrics | |
UIComponents._display_account_metrics(account_details) | |
st.divider() | |
def _display_account_metrics(account_details: Dict) -> None: | |
"""Display account metrics (followers, following, posts).""" | |
# Account creation date | |
create_date = account_details.get('account_create_date', '') | |
if create_date: | |
st.caption(f"π Account created: {create_date}") | |
# Basic metrics | |
m1, m2, m3 = st.columns(3) | |
followers = account_details.get('followers_count', 0) | |
following = account_details.get('following_count', 0) | |
posts = account_details.get('tweet_count', 0) | |
m1.metric( | |
"Followers", | |
format_large_number(followers), | |
help="Follower count from Twitter API" | |
) | |
m2.metric( | |
"Following", | |
format_large_number(following), | |
help="Following count from Twitter API" | |
) | |
m3.metric( | |
"Total Posts", | |
format_large_number(posts), | |
help="Total tweet count from Twitter API" | |
) | |
# Engagement metrics | |
likes = account_details.get('likes_count', 0) | |
views = account_details.get('views_count', 0) | |
replies = account_details.get('reply_count', 0) | |
reposts = account_details.get('repost_count', 0) | |
if likes > 0 or views > 0 or replies > 0 or reposts > 0: | |
st.caption("**π Total Engagement:**") | |
e1, e2, e3, e4 = st.columns(4) | |
e1.metric( | |
"Likes", | |
format_large_number(likes), | |
help="Total likes count" | |
) | |
e2.metric( | |
"Views", | |
format_large_number(views), | |
help="Total views/impressions count" | |
) | |
e3.metric( | |
"Replies", | |
format_large_number(replies), | |
help="Total replies count" | |
) | |
e4.metric( | |
"Reposts", | |
format_large_number(reposts), | |
help="Total reposts/retweets count" | |
) | |
# Advanced metrics sections | |
UIComponents._display_content_quality_metrics(account_details) | |
UIComponents._display_media_usage_metrics(account_details) | |
UIComponents._display_activity_patterns(account_details) | |
UIComponents._display_performance_metrics(account_details) | |
UIComponents._display_engagement_ratios(account_details) | |
# Warning for missing data | |
if followers == 0 and following == 0 and posts == 0: | |
st.warning("β οΈ Account metrics unavailable - this may be due to API limitations or account privacy settings") | |
def _display_content_quality_metrics(account_details: Dict) -> None: | |
"""Display content quality metrics.""" | |
avg_likes = account_details.get('avg_likes_per_tweet', 0) | |
avg_views = account_details.get('avg_views_per_tweet', 0) | |
engagement_rate = account_details.get('avg_engagement_rate', 0) | |
avg_length = account_details.get('avg_tweet_length', 0) | |
if avg_likes > 0 or avg_views > 0 or engagement_rate > 0: | |
st.caption("**π Content Quality:**") | |
q1, q2, q3, q4 = st.columns(4) | |
q1.metric( | |
"Avg Likes/Tweet", | |
f"{avg_likes:.1f}", | |
help="Average likes per tweet" | |
) | |
q2.metric( | |
"Avg Views/Tweet", | |
format_large_number(int(avg_views)), | |
help="Average views per tweet" | |
) | |
q3.metric( | |
"Engagement Rate", | |
f"{engagement_rate:.1f}%", | |
help="(Likes + Retweets) / Views * 100" | |
) | |
q4.metric( | |
"Avg Tweet Length", | |
f"{avg_length:.0f} chars", | |
help="Average character length per tweet" | |
) | |
def _display_media_usage_metrics(account_details: Dict) -> None: | |
"""Display media usage metrics.""" | |
media_count = account_details.get('tweets_with_media_count', 0) | |
media_percentage = account_details.get('media_usage_percentage', 0) | |
likes_with_media = account_details.get('avg_likes_with_media', 0) | |
likes_without_media = account_details.get('avg_likes_without_media', 0) | |
if media_count > 0 or media_percentage > 0: | |
st.caption("**π¬ Media Usage:**") | |
m1, m2, m3, m4 = st.columns(4) | |
m1.metric( | |
"Tweets with Media", | |
f"{media_count}", | |
help="Number of tweets with media attachments" | |
) | |
m2.metric( | |
"Media Usage", | |
f"{media_percentage:.1f}%", | |
help="Percentage of tweets with media" | |
) | |
m3.metric( | |
"Avg Likes (Media)", | |
f"{likes_with_media:.1f}", | |
help="Average likes for tweets with media" | |
) | |
m4.metric( | |
"Avg Likes (No Media)", | |
f"{likes_without_media:.1f}", | |
help="Average likes for tweets without media" | |
) | |
def _display_activity_patterns(account_details: Dict) -> None: | |
"""Display activity pattern metrics.""" | |
most_active_hour = account_details.get('most_active_hour', 0) | |
most_active_day = account_details.get('most_active_day', 'Unknown') | |
top_hours = account_details.get('top_activity_hours', []) | |
if most_active_hour > 0 or most_active_day != 'Unknown': | |
st.caption("**β° Activity Patterns:**") | |
a1, a2, a3, a4 = st.columns(4) | |
a1.metric( | |
"Most Active Hour", | |
f"{most_active_hour}:00", | |
help="Hour of day with most tweets" | |
) | |
a2.metric( | |
"Most Active Day", | |
most_active_day, | |
help="Day of week with most tweets" | |
) | |
a3.metric( | |
"Top Hours", | |
", ".join([f"{h}:00" for h in top_hours[:2]]), | |
help="Top active hours" | |
) | |
# Hashtag and mention usage | |
hashtags = account_details.get('total_hashtags_used', 0) | |
mentions = account_details.get('total_mentions_used', 0) | |
a4.metric( | |
"Hashtags Used", | |
f"{hashtags}", | |
help="Total hashtags used in tweets" | |
) | |
def _display_performance_metrics(account_details: Dict) -> None: | |
"""Display performance metrics.""" | |
highest_likes = account_details.get('highest_likes', 0) | |
viral_count = account_details.get('viral_tweets_count', 0) | |
viral_percentage = account_details.get('viral_content_percentage', 0) | |
top_tweet_text = account_details.get('top_tweet_text', '') | |
top_tweet_url = account_details.get('top_tweet_url', '') | |
if highest_likes > 0 or viral_count > 0: | |
st.caption("**π Performance:**") | |
p1, p2, p3, p4 = st.columns(4) | |
p1.metric( | |
"Highest Likes", | |
format_large_number(highest_likes), | |
help="Most likes on a single tweet" | |
) | |
p2.metric( | |
"Viral Tweets", | |
f"{viral_count}", | |
help="Tweets in top 10% by likes" | |
) | |
p3.metric( | |
"Viral Content %", | |
f"{viral_percentage:.1f}%", | |
help="Percentage of viral tweets" | |
) | |
p4.metric( | |
"Engagement Score", | |
f"{account_details.get('engagement_score', 0):.1f}", | |
help="Weighted engagement score (likesΓ1 + retweetsΓ2 + repliesΓ3)" | |
) | |
# Show top tweet if available | |
if top_tweet_text and top_tweet_url: | |
st.caption("**π Top Performing Tweet:**") | |
with st.expander("View top tweet"): | |
st.write(f"**Likes:** {format_large_number(highest_likes)}") | |
st.write(f"**Text:** {top_tweet_text}") | |
st.write(f"**URL:** {top_tweet_url}") | |
def _display_engagement_ratios(account_details: Dict) -> None: | |
"""Display engagement ratio metrics.""" | |
like_to_view = account_details.get('like_to_view_ratio', 0) | |
retweet_to_like = account_details.get('retweet_to_like_ratio', 0) | |
reply_to_like = account_details.get('reply_to_like_ratio', 0) | |
total_engagement = account_details.get('total_engagement', 0) | |
if like_to_view > 0 or retweet_to_like > 0 or reply_to_like > 0: | |
st.caption("**π Engagement Ratios:**") | |
r1, r2, r3, r4 = st.columns(4) | |
r1.metric( | |
"Like Rate", | |
f"{like_to_view:.2f}%", | |
help="Likes per view percentage" | |
) | |
r2.metric( | |
"Retweet Rate", | |
f"{retweet_to_like:.2f}%", | |
help="Retweets per like percentage" | |
) | |
r3.metric( | |
"Reply Rate", | |
f"{reply_to_like:.2f}%", | |
help="Replies per like percentage" | |
) | |
r4.metric( | |
"Total Engagement", | |
format_large_number(total_engagement), | |
help="Total likes + retweets + replies" | |
) | |
def display_key_metrics(df: pd.DataFrame) -> None: | |
"""Display key engagement metrics.""" | |
if df.empty: | |
return | |
st.subheader("π Key Metrics") | |
# Basic metrics | |
c1, c2, c3 = st.columns(3) | |
c1.metric("Total Tweets Scanned", f"{len(df):,}") | |
c2.metric("Total Likes", f"{df['Likes'].sum():,}") | |
c3.metric("Total Retweets", f"{df['Retweets'].sum():,}") | |
# Engagement metrics | |
st.subheader("β‘ Engagement") | |
df_copy = df.copy() | |
df_copy["Engagement"] = df_copy["Likes"] + df_copy["Retweets"] + df_copy["Replies"] | |
total_engagement = df_copy["Engagement"].sum() | |
avg_engagement = total_engagement / len(df) if len(df) > 0 else 0 | |
total_views = df["Views"].sum() | |
engagement_rate = (total_engagement / total_views * 100) if total_views > 0 else 0 | |
e1, e2 = st.columns(2) | |
e1.metric("Avg. Engagement/Tweet", f"{avg_engagement:.1f}") | |
e2.metric("Engagement Rate (vs Views)", f"{engagement_rate:.2f}%") | |
st.divider() | |
def display_content_analysis(metrics: Dict) -> None: | |
"""Display content analysis section.""" | |
st.subheader("π Content Analysis") | |
top_hashtags = metrics.get("top_hashtags", []) | |
top_mentions = metrics.get("top_mentions", []) | |
if top_hashtags: | |
st.markdown("**Top Hashtags**") | |
st.write(", ".join([f"`#{tag}` ({count})" for tag, count in top_hashtags])) | |
if top_mentions: | |
st.markdown("**Top Mentions**") | |
st.write(", ".join([f"`@{user}` ({count})" for user, count in top_mentions])) | |
def display_ai_summary(gemini_summary: Optional[str]) -> None: | |
"""Display AI-generated summary section.""" | |
if gemini_summary: | |
st.subheader("π§ AI Summary & Recommendations") | |
st.markdown(gemini_summary) | |
st.divider() | |
def display_most_engaging_tweet(df: pd.DataFrame) -> None: | |
"""Display the most engaging tweet.""" | |
if df.empty: | |
return | |
st.subheader("π Most Engaging Tweet") | |
df_copy = df.copy() | |
df_copy["Engagement"] = df_copy["Likes"] + df_copy["Retweets"] + df_copy["Replies"] | |
most_engaging = df_copy.loc[df_copy["Engagement"].idxmax()] | |
with st.container(border=True): | |
st.markdown(f"**{most_engaging['Text']}**") | |
stats = (f"β€οΈ {most_engaging['Likes']} | π {most_engaging['Retweets']} | " | |
f"π¬ {most_engaging['Replies']} | ποΈ {most_engaging['Views']}") | |
st.markdown(f"**{stats}** | [{most_engaging['Date']}]({most_engaging['URL']})") | |
st.divider() | |
def display_charts(df: pd.DataFrame) -> None: | |
"""Display data visualization charts.""" | |
if df.empty: | |
return | |
st.subheader("π Posting Patterns") | |
# Tweets by day | |
df_by_day = df.groupby('Date_Only')['Text'].count().reset_index() | |
df_by_day['Date_Only'] = pd.to_datetime(df_by_day['Date_Only']) | |
fig_day = px.line( | |
df_by_day, | |
x='Date_Only', | |
y='Text', | |
title="Tweets per Day", | |
labels={'Date_Only': 'Date', 'Text': 'Count'} | |
) | |
st.plotly_chart(fig_day, use_container_width=True) | |
def display_data_download(df: pd.DataFrame) -> None: | |
"""Display raw data table with download option.""" | |
st.subheader("π Raw Data") | |
st.dataframe(df) | |
if not df.empty: | |
csv = df.to_csv(index=False).encode('utf-8') | |
st.download_button( | |
"π₯ Download as CSV", | |
csv, | |
f"twitter_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
"text/csv", | |
key="download-csv", | |
use_container_width=True | |
) | |
# ============================================================================= | |
# DASHBOARD MANAGEMENT | |
# ============================================================================= | |
class TwitterDashboard: | |
"""Main dashboard for displaying Twitter analysis results.""" | |
def __init__(self, df: pd.DataFrame, metrics: Dict, dataset_id: str, | |
analysis_type: str = "Account's Tweets", gemini_summary: Optional[str] = None): | |
self.df = df | |
self.metrics = metrics | |
self.dataset_id = dataset_id | |
self.analysis_type = analysis_type | |
self.gemini_summary = gemini_summary | |
def render(self) -> None: | |
"""Render the complete dashboard.""" | |
if self.df.empty: | |
st.warning("No data available to display.") | |
return | |
# Main layout | |
left_col, right_col = st.columns([1, 1], gap="large") | |
with left_col: | |
# Only show account info for "Account's Tweets" analysis | |
if self.analysis_type == "Account's Tweets": | |
UIComponents.display_account_info(self.metrics.get("account_details", {})) | |
else: | |
# For "Comments to Account", show a different header | |
st.subheader(f"π¬ Comments Analysis") | |
st.info("Analyzing comments and replies directed to the account") | |
st.divider() | |
UIComponents.display_key_metrics(self.df) | |
UIComponents.display_content_analysis(self.metrics) | |
with right_col: | |
UIComponents.display_ai_summary(self.gemini_summary) | |
UIComponents.display_most_engaging_tweet(self.df) | |
UIComponents.display_charts(self.df) | |
# Full-width sections | |
UIComponents.display_data_download(self.df) | |
# ============================================================================= | |
# SCHEDULER MANAGEMENT | |
# ============================================================================= | |
class SchedulerManager: | |
"""Manages scheduled users and automation settings.""" | |
def __init__(self, db: DatabaseManager): | |
self.db = db | |
def render_controls(self) -> None: | |
"""Render scheduler management interface.""" | |
st.header("π Scheduler Management") | |
if not self.db.is_connected: | |
st.warning("β οΈ Database not connected. Scheduler features unavailable.") | |
return | |
self._display_current_users() | |
st.divider() | |
self._display_add_user_form() | |
st.divider() | |
self._display_scheduler_info() | |
def _display_current_users(self) -> None: | |
"""Display currently scheduled users.""" | |
st.subheader("π Current Scheduled Users") | |
try: | |
scheduled_users = list(self.db.scheduler_users_collection.find({"active": True})) | |
usernames = [user["username"] for user in scheduled_users] | |
except Exception as e: | |
st.error(f"Error fetching scheduled users: {e}") | |
return | |
if usernames: | |
for username in usernames: | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
st.write(f"@{username}") | |
with col2: | |
if st.button("ποΈ", key=f"remove_{username}", help=f"Remove @{username}"): | |
if self._remove_user(username): | |
st.rerun() | |
else: | |
st.info("No users currently scheduled.") | |
def _display_add_user_form(self) -> None: | |
"""Display form to add new users.""" | |
st.subheader("β Add New User") | |
new_username = st.text_input("Username to schedule (without @)", key="new_scheduled_user") | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("Add User", use_container_width=True, disabled=not new_username): | |
if self._add_user(new_username): | |
st.success(f"β Added @{new_username} to scheduler") | |
st.rerun() | |
with col2: | |
if st.button("π Refresh List", use_container_width=True): | |
st.rerun() | |
def _display_scheduler_info(self) -> None: | |
"""Display scheduler information.""" | |
st.subheader("βΉοΈ Scheduler Info") | |
st.info(""" | |
**GitHub Actions Automation:** | |
- Runs daily at 12:00 AM IST automatically | |
- Can be triggered manually from GitHub Actions tab | |
- Scrapes only the previous day's data (no overlap) | |
- Stores results in MongoDB with duplicate detection | |
""") | |
def _add_user(self, username: str) -> bool: | |
"""Add user to scheduled scraping list.""" | |
try: | |
# Check if user already exists | |
existing_users = list(self.db.scheduler_users_collection.find({"active": True})) | |
if username in [user["username"] for user in existing_users]: | |
st.warning("User already scheduled") | |
return False | |
user_doc = { | |
"username": username, | |
"active": True, | |
"added_at": datetime.utcnow(), | |
"last_scraped": None | |
} | |
self.db.scheduler_users_collection.update_one( | |
{"username": username}, | |
{"$set": user_doc}, | |
upsert=True | |
) | |
return True | |
except Exception as e: | |
st.error(f"Error adding user: {e}") | |
return False | |
def _remove_user(self, username: str) -> bool: | |
"""Remove user from scheduled scraping list.""" | |
try: | |
self.db.scheduler_users_collection.update_one( | |
{"username": username}, | |
{"$set": {"active": False}} | |
) | |
return True | |
except Exception as e: | |
st.error(f"Error removing user: {e}") | |
return False | |
# ============================================================================= | |
# MAIN APPLICATION | |
# ============================================================================= | |
class TwitterAnalyzerApp: | |
"""Main Twitter Analyzer application.""" | |
def __init__(self): | |
self._setup_page() | |
self._initialize_services() | |
def _setup_page(self) -> None: | |
"""Configure Streamlit page settings.""" | |
st.set_page_config(**PAGE_CONFIG) | |
st.title("π¦ Twitter Content Analyzer") | |
def _initialize_services(self) -> None: | |
"""Initialize all required services.""" | |
try: | |
self.config = AppConfig() | |
self.db = DatabaseManager(self.config.mongodb_uri) | |
self.apify = ApifyService(self.config.apify_api_key) | |
self.gemini = GeminiService(self.config.gemini_api_key) if self.config.gemini_api_key else None | |
self.processor = TweetDataProcessor() | |
self.scheduler = SchedulerManager(self.db) | |
except ValueError as e: | |
st.error(f"Initialization failed: {e}. Please check your .env.local file.") | |
st.stop() | |
def run(self) -> None: | |
"""Execute the main application.""" | |
self._render_sidebar() | |
if not hasattr(self, 'run_button') or not self.run_button or not self.username: | |
st.info("Please enter a Twitter username and click 'Analyze' to begin.") | |
return | |
self._perform_analysis() | |
def _render_sidebar(self) -> None: | |
"""Render the application sidebar.""" | |
with st.sidebar: | |
self._render_analysis_controls() | |
self._render_debug_options() | |
st.divider() | |
self.scheduler.render_controls() | |
def _render_analysis_controls(self) -> None: | |
"""Render analysis control widgets.""" | |
st.header("βοΈ Analysis Controls") | |
self.analysis_type = st.radio( | |
"Analysis Type", | |
["Account's Tweets", "Comments to Account"], | |
horizontal=True | |
) | |
self.username = st.text_input("Twitter Username (without @)", DEFAULT_USERNAME) | |
# Date inputs | |
today = datetime.now() | |
last_week = today - timedelta(days=DEFAULT_DAYS_BACK) | |
self.since_date = st.date_input("Start Date", last_week) | |
self.until_date = st.date_input("End Date", today) | |
self.run_button = st.button("π Analyze", use_container_width=True, type="primary") | |
def _render_debug_options(self) -> None: | |
"""Render debug options.""" | |
with st.expander("π§ Debug Options"): | |
st.session_state['debug_mode'] = st.checkbox( | |
"Show API Debug Info", | |
help="Shows raw API data for troubleshooting" | |
) | |
def _perform_analysis(self) -> None: | |
"""Perform the main analysis workflow.""" | |
since_str = self.since_date.strftime("%Y-%m-%d") | |
until_str = self.until_date.strftime("%Y-%m-%d") | |
# Fetch data based on analysis type | |
try: | |
if self.analysis_type == "Account's Tweets": | |
raw_data, dataset_id = self.apify.fetch_account_tweets(self.username, since_str, until_str) | |
context = f"This is an analysis of tweets by the Twitter account @{self.username}." | |
else: | |
raw_data, dataset_id = self.apify.fetch_account_comments(self.username, since_str, until_str) | |
context = f"This is an analysis of comments/replies sent to the Twitter account @{self.username}." | |
if not raw_data: | |
st.error("No data was returned from the API. The account may be private, have no tweets in the selected range, or there might be an API issue.") | |
return | |
# Process data | |
df, metrics = self.processor.process_tweets(raw_data, self.username) | |
# Generate AI summary if available | |
gemini_summary = None | |
if self.gemini: | |
gemini_summary = self.gemini.generate_analysis(df.head(100), context) | |
else: | |
st.warning("GEMINI_API_KEY not found. AI summary will be skipped.") | |
# Display results | |
dashboard = TwitterDashboard(df, metrics, dataset_id, self.analysis_type, gemini_summary) | |
dashboard.render() | |
except Exception as e: | |
logger.error(f"Analysis failed: {e}") | |
st.error(f"Analysis failed: {str(e)}") | |
# ============================================================================= | |
# APPLICATION ENTRY POINT | |
# ============================================================================= | |
def main(): | |
"""Application entry point.""" | |
app = TwitterAnalyzerApp() | |
app.run() | |
if __name__ == "__main__": | |
main() | |