scrape / src /streamlit_app.py
poemsforaphrodite's picture
Update src/streamlit_app.py
3de2787 verified
#!/usr/bin/env python3
"""
Twitter Content Analyzer
A comprehensive Twitter data collection and analysis tool with automated scheduling capabilities.
"""
import os
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from collections import Counter
import streamlit as st
import pandas as pd
import plotly.express as px
import pytz
from pymongo import MongoClient
import google.generativeai as genai
from apify_client import ApifyClient
from dotenv import load_dotenv
# =============================================================================
# CONSTANTS
# =============================================================================
DEFAULT_USERNAME = "narendramodi"
DEFAULT_DAYS_BACK = 7
IST_TIMEZONE = 'Asia/Kolkata'
UTC_TIMEZONE = 'UTC'
# Twitter API date format
TWITTER_DATE_FORMAT = "%a %b %d %H:%M:%S %z %Y"
# MongoDB collection names
TWEETS_COLLECTION = "tweets"
SCHEDULER_USERS_COLLECTION = "scheduler_users"
# Streamlit page config
PAGE_CONFIG = {
"page_title": "Twitter Scraper & Analyzer",
"page_icon": "🐦",
"layout": "wide",
"initial_sidebar_state": "expanded"
}
# =============================================================================
# LOGGING CONFIGURATION
# =============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def convert_to_ist(utc_dt: datetime) -> datetime:
"""Convert UTC datetime to Indian Standard Time."""
if utc_dt.tzinfo is None:
utc_dt = pytz.utc.localize(utc_dt)
return utc_dt.astimezone(pytz.timezone(IST_TIMEZONE))
def safe_get_nested(data: Dict, keys: List[str], default=None):
"""Safely get nested dictionary values."""
for key in keys:
if isinstance(data, dict) and key in data:
data = data[key]
else:
return default
return data
def format_large_number(num: int) -> str:
"""Format large numbers with commas."""
return f"{num:,}" if num > 0 else "N/A"
# =============================================================================
# CONFIGURATION MANAGEMENT
# =============================================================================
class AppConfig:
"""Centralized configuration management."""
def __init__(self, env_path: str = ".env.local"):
load_dotenv(dotenv_path=env_path)
self._validate_config()
@property
def mongodb_uri(self) -> Optional[str]:
return os.getenv("MONGODB_URI")
@property
def apify_api_key(self) -> Optional[str]:
return os.getenv("APIFY_API_KEY")
@property
def gemini_api_key(self) -> Optional[str]:
return os.getenv("GEMINI_API_KEY")
def _validate_config(self) -> None:
"""Validate essential configuration."""
if not self.apify_api_key:
raise ValueError("APIFY_API_KEY is required but not found in environment variables")
# =============================================================================
# DATABASE MANAGEMENT
# =============================================================================
class DatabaseManager:
"""Handles all MongoDB operations."""
def __init__(self, uri: Optional[str]):
self.client = None
self.db = None
self.is_connected = False
self._connect(uri)
def _connect(self, uri: Optional[str]) -> None:
"""Establish MongoDB connection."""
if not uri:
logger.warning("No MongoDB URI provided. Running in offline mode.")
self._setup_dummy_collections()
return
try:
self.client = MongoClient(uri, serverSelectionTimeoutMS=5000)
self.client.admin.command('ping')
self.db = self.client["DataCollector"]
self.tweets_collection = self.db[TWEETS_COLLECTION]
self.scheduler_users_collection = self.db[SCHEDULER_USERS_COLLECTION]
self.is_connected = True
logger.info("βœ… MongoDB connected successfully")
except Exception as e:
logger.error(f"⚠️ MongoDB connection failed: {e}")
logger.info("πŸ”„ Running in offline mode - data will not be stored")
self._setup_dummy_collections()
def _setup_dummy_collections(self) -> None:
"""Setup dummy collections for offline mode."""
class DummyCollection:
def update_one(self, *args, **kwargs): pass
def find(self, *args, **kwargs): return []
def find_one(self, *args, **kwargs): return None
def insert_one(self, *args, **kwargs): pass
self.tweets_collection = DummyCollection()
self.scheduler_users_collection = DummyCollection()
self.is_connected = False
# =============================================================================
# API SERVICES
# =============================================================================
class ApifyService:
"""Handles Apify API interactions for Twitter data collection."""
ACTOR_ID = "CJdippxWmn9uRfooo"
def __init__(self, api_key: str):
self.client = ApifyClient(api_key)
def _run_actor(self, run_input: Dict[str, Any]) -> Tuple[List[Dict], str]:
"""Execute Apify actor and retrieve dataset."""
try:
run = self.client.actor(self.ACTOR_ID).call(run_input=run_input)
dataset_id = run["defaultDatasetId"]
data = list(self.client.dataset(dataset_id).iterate_items())
return data, dataset_id
except Exception as e:
logger.error(f"Apify actor execution failed: {e}")
raise
def fetch_account_tweets(self, username: str, since: str, until: str) -> Tuple[List[Dict], str]:
"""Fetch tweets posted by a specific account."""
# Handle both simple date (YYYY-MM-DD) and full timestamp (YYYY-MM-DD_HH:MM:SS) formats
since_formatted = f"{since}_UTC" if "_" in since else f"{since}_00:00:00_UTC"
until_formatted = f"{until}_UTC" if "_" in until else f"{until}_23:59:59_UTC"
run_input = {
"from": username.strip(),
"since": since_formatted,
"until": until_formatted,
"queryType": "Latest",
"include:nativeretweets": True,
}
with st.spinner(f"Fetching tweets for @{username} from {since} to {until}..."):
data, dataset_id = self._run_actor(run_input)
st.info(f"πŸ” Query Details: from:{username} | Raw results: {len(data)} tweets")
return data, dataset_id
def fetch_account_comments(self, username: str, since: str, until: str) -> Tuple[List[Dict], str]:
"""Fetch comments/replies directed to a specific account."""
# Handle both simple date (YYYY-MM-DD) and full timestamp (YYYY-MM-DD_HH:MM:SS) formats
since_formatted = f"{since}_UTC" if "_" in since else f"{since}_00:00:00_UTC"
until_formatted = f"{until}_UTC" if "_" in until else f"{until}_23:59:59_UTC"
run_input = {
"to": username.strip(),
"since": since_formatted,
"until": until_formatted,
"queryType": "Latest",
}
with st.spinner(f"Fetching comments for @{username} from {since} to {until}..."):
data, dataset_id = self._run_actor(run_input)
st.info(f"πŸ” Query Details: to:@{username} | Raw results: {len(data)} comments")
return data, dataset_id
class GeminiService:
"""Handles Google Generative AI interactions."""
def __init__(self, api_key: str):
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-1.5-flash')
def generate_analysis(self, tweets_df: pd.DataFrame, context: str) -> str:
"""Generate AI-powered analysis of tweets."""
if tweets_df.empty:
return "No tweets provided for analysis."
with st.spinner("Generating AI summary with Gemini..."):
try:
tweets_text = self._format_tweets_for_analysis(tweets_df)
prompt = self._create_analysis_prompt(context, tweets_text)
response = self.model.generate_content(prompt)
return response.text
except Exception as e:
logger.error(f"Gemini analysis failed: {e}")
return f"Error generating summary: {str(e)}"
def _format_tweets_for_analysis(self, tweets_df: pd.DataFrame) -> str:
"""Format tweets for AI analysis."""
return "\n\n".join([
f"{i}. @{row.Username}: {row.Text} (Likes: {row.Likes}, Retweets: {row.Retweets})"
for i, row in enumerate(tweets_df.itertuples(), 1)
])
def _create_analysis_prompt(self, context: str, tweets_text: str) -> str:
"""Create analysis prompt for Gemini."""
return f"""
{context}
Here are the tweets to analyze:
{tweets_text}
Please provide a comprehensive analysis covering:
1. **Main Themes & Topics:** What are the key subjects of discussion?
2. **Overall Sentiment:** What is the general tone (positive, negative, neutral)?
3. **Key Insights & Patterns:** Are there any notable trends or surprising findings?
4. **Top Recommendations:** Provide 5 actionable suggestions for the brand/party to improve their strategy based on this feedback.
Format the response clearly using Markdown.
"""
# =============================================================================
# DATA PROCESSING
# =============================================================================
class TweetDataProcessor:
"""Processes raw tweet data into structured format."""
def process_tweets(self, raw_data: List[Dict[str, Any]], target_username: str = None) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""Transform raw API data into clean DataFrame and metrics."""
processed_data = []
hashtags_counter = Counter()
mentions_counter = Counter()
all_author_data = []
skipped_count = 0
error_count = 0
for item in raw_data:
try:
processed_tweet = self._process_single_tweet(item, hashtags_counter, mentions_counter, all_author_data, target_username)
if processed_tweet:
processed_data.append(processed_tweet)
else:
skipped_count += 1
except Exception as e:
error_count += 1
# Only log individual errors in debug mode
if st.session_state.get('debug_mode', False):
logger.warning(f"Skipping tweet due to processing error: {e}")
st.warning(f"Skipping a tweet due to processing error: {e}")
# Show summary of skipped items only if significant
if skipped_count > 0 and st.session_state.get('debug_mode', False):
st.info(f"ℹ️ Skipped {skipped_count} items (likely mock/invalid data)")
if error_count > 0:
st.warning(f"⚠️ {error_count} items had processing errors")
# Extract best account details
account_details = self._extract_best_account_details(all_author_data, target_username)
# Create DataFrame and calculate engagement metrics from tweet data
df = pd.DataFrame(processed_data)
engagement_metrics = self._calculate_engagement_metrics(df, target_username)
# Add engagement metrics to account_details
if account_details:
account_details.update(engagement_metrics)
metrics = {
"top_hashtags": hashtags_counter.most_common(5),
"top_mentions": mentions_counter.most_common(5),
"account_details": account_details
}
return df, metrics
def _calculate_engagement_metrics(self, df: pd.DataFrame, target_username: str = None) -> Dict:
"""Calculate comprehensive engagement metrics from tweet data."""
if df.empty:
return self._get_empty_metrics()
# Filter to only tweets from the target user if specified
if target_username:
user_tweets = df[df['Username'].str.lower() == target_username.lower()]
else:
user_tweets = df
if user_tweets.empty:
return self._get_empty_metrics()
# Basic engagement totals
likes_count = user_tweets['Likes'].sum() if 'Likes' in user_tweets.columns else 0
views_count = user_tweets['Views'].sum() if 'Views' in user_tweets.columns else 0
reply_count = user_tweets['Replies'].sum() if 'Replies' in user_tweets.columns else 0
repost_count = user_tweets['Retweets'].sum() if 'Retweets' in user_tweets.columns else 0
tweet_count = len(user_tweets)
# Content quality metrics
avg_likes_per_tweet = likes_count / tweet_count if tweet_count > 0 else 0
avg_views_per_tweet = views_count / tweet_count if tweet_count > 0 else 0
avg_engagement_rate = ((likes_count + repost_count) / views_count * 100) if views_count > 0 else 0
# Content length analysis
if 'Text' in user_tweets.columns:
text_lengths = user_tweets['Text'].astype(str).str.len()
avg_tweet_length = text_lengths.mean()
longest_tweet_length = text_lengths.max()
shortest_tweet_length = text_lengths.min()
else:
avg_tweet_length = longest_tweet_length = shortest_tweet_length = 0
# Media usage metrics
if 'Has_Media' in user_tweets.columns:
tweets_with_media = user_tweets['Has_Media'].sum()
media_usage_percentage = (tweets_with_media / tweet_count * 100) if tweet_count > 0 else 0
# Media effectiveness
media_tweets = user_tweets[user_tweets['Has_Media'] == True]
no_media_tweets = user_tweets[user_tweets['Has_Media'] == False]
avg_likes_with_media = media_tweets['Likes'].mean() if len(media_tweets) > 0 else 0
avg_likes_without_media = no_media_tweets['Likes'].mean() if len(no_media_tweets) > 0 else 0
else:
tweets_with_media = media_usage_percentage = 0
avg_likes_with_media = avg_likes_without_media = 0
# Hashtag and mention analysis
if 'Hashtags' in user_tweets.columns:
# Count hashtags from the Hashtags field (comma-separated string)
hashtag_counts = user_tweets['Hashtags'].astype(str).apply(lambda x: len([h.strip() for h in x.split(',') if h.strip()]))
total_hashtags_used = hashtag_counts.sum()
avg_hashtags_per_tweet = hashtag_counts.mean()
tweets_with_hashtags_percentage = ((hashtag_counts > 0).sum() / tweet_count * 100) if tweet_count > 0 else 0
elif 'Hashtag_Count' in user_tweets.columns:
# Fallback to Hashtag_Count if available
total_hashtags_used = user_tweets['Hashtag_Count'].sum()
avg_hashtags_per_tweet = user_tweets['Hashtag_Count'].mean()
tweets_with_hashtags_percentage = ((user_tweets['Hashtag_Count'] > 0).sum() / tweet_count * 100) if tweet_count > 0 else 0
else:
total_hashtags_used = avg_hashtags_per_tweet = tweets_with_hashtags_percentage = 0
if 'Mentions' in user_tweets.columns:
# Count mentions from the Mentions field (comma-separated string)
mention_counts = user_tweets['Mentions'].astype(str).apply(lambda x: len([m.strip() for m in x.split(',') if m.strip()]))
total_mentions_used = mention_counts.sum()
avg_mentions_per_tweet = mention_counts.mean()
elif 'Mention_Count' in user_tweets.columns:
# Fallback to Mention_Count if available
total_mentions_used = user_tweets['Mention_Count'].sum()
avg_mentions_per_tweet = user_tweets['Mention_Count'].mean()
else:
total_mentions_used = avg_mentions_per_tweet = 0
# Timing and activity patterns
if 'Hour' in user_tweets.columns:
most_active_hour = user_tweets['Hour'].mode().values[0] if len(user_tweets['Hour'].mode()) > 0 else 0
hourly_distribution = user_tweets['Hour'].value_counts().head(3).to_dict()
else:
most_active_hour = 0
hourly_distribution = {}
if 'Day_of_Week' in user_tweets.columns:
most_active_day = user_tweets['Day_of_Week'].mode().values[0] if len(user_tweets['Day_of_Week'].mode()) > 0 else "Unknown"
else:
most_active_day = "Unknown"
# Performance metrics
if 'Likes' in user_tweets.columns and not user_tweets.empty:
highest_likes = user_tweets['Likes'].max()
top_tweet_idx = user_tweets['Likes'].idxmax()
top_tweet_text = user_tweets.loc[top_tweet_idx, 'Text'][:100] + "..." if 'Text' in user_tweets.columns else ""
top_tweet_url = user_tweets.loc[top_tweet_idx, 'URL'] if 'URL' in user_tweets.columns else ""
# Viral content (top 10% threshold)
viral_threshold = user_tweets['Likes'].quantile(0.9)
viral_tweets_count = (user_tweets['Likes'] > viral_threshold).sum()
viral_content_percentage = (viral_tweets_count / tweet_count * 100) if tweet_count > 0 else 0
else:
highest_likes = viral_tweets_count = viral_content_percentage = 0
top_tweet_text = top_tweet_url = ""
# Audience engagement ratios
like_to_view_ratio = (likes_count / views_count * 100) if views_count > 0 else 0
retweet_to_like_ratio = (repost_count / likes_count * 100) if likes_count > 0 else 0
reply_to_like_ratio = (reply_count / likes_count * 100) if likes_count > 0 else 0
# Engagement score (weighted: likes=1, retweets=2, replies=3)
total_engagement = likes_count + repost_count + reply_count
engagement_score = (likes_count * 1 + repost_count * 2 + reply_count * 3) / tweet_count if tweet_count > 0 else 0
return {
# Basic metrics
"likes_count": int(likes_count),
"views_count": int(views_count),
"reply_count": int(reply_count),
"repost_count": int(repost_count),
# Content quality metrics
"avg_likes_per_tweet": round(avg_likes_per_tweet, 1),
"avg_views_per_tweet": round(avg_views_per_tweet, 1),
"avg_engagement_rate": round(avg_engagement_rate, 2),
"avg_tweet_length": round(avg_tweet_length, 1),
"longest_tweet_length": int(longest_tweet_length),
"shortest_tweet_length": int(shortest_tweet_length),
# Media usage metrics
"tweets_with_media_count": int(tweets_with_media),
"media_usage_percentage": round(media_usage_percentage, 1),
"avg_likes_with_media": round(avg_likes_with_media, 1),
"avg_likes_without_media": round(avg_likes_without_media, 1),
# Hashtag and mention metrics
"total_hashtags_used": int(total_hashtags_used),
"avg_hashtags_per_tweet": round(avg_hashtags_per_tweet, 1),
"tweets_with_hashtags_percentage": round(tweets_with_hashtags_percentage, 1),
"total_mentions_used": int(total_mentions_used),
"avg_mentions_per_tweet": round(avg_mentions_per_tweet, 1),
# Activity patterns
"most_active_hour": int(most_active_hour),
"most_active_day": str(most_active_day),
"top_activity_hours": list(hourly_distribution.keys())[:3],
# Performance metrics
"highest_likes": int(highest_likes),
"top_tweet_text": str(top_tweet_text),
"top_tweet_url": str(top_tweet_url),
"viral_tweets_count": int(viral_tweets_count),
"viral_content_percentage": round(viral_content_percentage, 1),
# Engagement ratios
"like_to_view_ratio": round(like_to_view_ratio, 2),
"retweet_to_like_ratio": round(retweet_to_like_ratio, 2),
"reply_to_like_ratio": round(reply_to_like_ratio, 2),
"engagement_score": round(engagement_score, 1),
"total_engagement": int(total_engagement),
}
def _get_empty_metrics(self) -> Dict:
"""Return empty metrics structure."""
return {
# Basic metrics
"likes_count": 0, "views_count": 0, "reply_count": 0, "repost_count": 0,
# Content quality metrics
"avg_likes_per_tweet": 0, "avg_views_per_tweet": 0, "avg_engagement_rate": 0,
"avg_tweet_length": 0, "longest_tweet_length": 0, "shortest_tweet_length": 0,
# Media usage metrics
"tweets_with_media_count": 0, "media_usage_percentage": 0,
"avg_likes_with_media": 0, "avg_likes_without_media": 0,
# Hashtag and mention metrics
"total_hashtags_used": 0, "avg_hashtags_per_tweet": 0, "tweets_with_hashtags_percentage": 0,
"total_mentions_used": 0, "avg_mentions_per_tweet": 0,
# Activity patterns
"most_active_hour": 0, "most_active_day": "Unknown", "top_activity_hours": [],
# Performance metrics
"highest_likes": 0, "top_tweet_text": "", "top_tweet_url": "",
"viral_tweets_count": 0, "viral_content_percentage": 0,
# Engagement ratios
"like_to_view_ratio": 0, "retweet_to_like_ratio": 0, "reply_to_like_ratio": 0,
"engagement_score": 0, "total_engagement": 0,
}
def _is_mock_tweet(self, item: Dict) -> bool:
"""Detect if a tweet is mock/invalid data that should be ignored."""
# Check for missing essential fields that real tweets should have
essential_fields = ['createdAt', 'text', 'author']
missing_fields = sum(1 for field in essential_fields if not item.get(field))
# If missing multiple essential fields, likely mock data
if missing_fields >= 2:
return True
# Check for empty or placeholder text
text = item.get("text", "").strip()
if not text or text.lower() in ["", "null", "undefined", "test", "placeholder"]:
return True
# Check for missing or empty author data
author = item.get("author", {})
if not author or not author.get("userName", "").strip():
return True
# Check for obviously fake/test usernames
username = author.get("userName", "").lower()
test_patterns = ["test", "mock", "fake", "placeholder", "example"]
if any(pattern in username for pattern in test_patterns):
return True
return False
def _process_single_tweet(self, item: Dict, hashtags_counter: Counter,
mentions_counter: Counter, all_author_data: List, target_username: str = None) -> Optional[Dict]:
"""Process a single tweet item."""
# Extract author data
author = item.get("author", {})
if author:
# Only collect author data from the target user if target_username is specified
# This prevents random accounts from being saved in replies data
if target_username:
author_username = author.get("userName", "").lower()
if author_username == target_username.lower():
all_author_data.append(author)
else:
all_author_data.append(author)
# Check if this is a mock/invalid tweet (has minimal or no real data)
is_mock_tweet = self._is_mock_tweet(item)
# Validate date information
created_at = item.get("createdAt", "")
if not created_at:
# Only show warning for real tweets missing dates, and only in debug mode
if not is_mock_tweet and st.session_state.get('debug_mode', False):
st.warning("Skipping a tweet due to missing date information")
return None
# Parse date
try:
date_obj_utc = datetime.strptime(created_at, TWITTER_DATE_FORMAT)
date_obj_ist = convert_to_ist(date_obj_utc)
except ValueError as e:
# Only log/warn for real tweets with invalid dates
if not is_mock_tweet:
if st.session_state.get('debug_mode', False):
st.warning(f"Skipping tweet due to invalid date format: {created_at}")
logger.warning(f"Invalid date format: {created_at}")
return None
# Extract text and analyze
text = item.get("text", "")
hashtags = [word.strip("#") for word in text.split() if word.startswith('#')]
mentions = [word.strip("@") for word in text.split() if word.startswith('@')]
# Update counters
hashtags_counter.update(hashtags)
mentions_counter.update(mentions)
return {
"Date": date_obj_ist.strftime("%Y-%m-%d %H:%M:%S"),
"Date_Only": date_obj_ist.strftime("%Y-%m-%d"),
"Hour": date_obj_ist.hour,
"Day_of_Week": date_obj_ist.strftime("%A"),
"Username": author.get("userName", ""),
"Text": text,
"Likes": item.get("likeCount", 0),
"Retweets": item.get("retweetCount", 0),
"Replies": item.get("replyCount", 0),
"Views": item.get("viewCount", 0),
"URL": item.get("url", ""),
"Has_Media": "extendedEntities" in item,
"Hashtags": ", ".join(hashtags),
"Mentions": ", ".join(mentions),
}
def _extract_best_account_details(self, all_author_data: List[Dict], target_username: str = None) -> Dict:
"""Extract the most complete account details from author data."""
if not all_author_data:
# If no author data and we have a target username, create a basic structure
if target_username:
return {
"name": target_username,
"username": target_username,
"bio": "",
"followers_count": 0,
"following_count": 0,
"tweet_count": 0,
"verified": False,
"profile_image_url": ""
}
return {}
# Find the author data with the most complete information
best_author = self._find_most_complete_author(all_author_data)
# Debug information
if st.session_state.get('debug_mode', False):
st.write("Debug - Found", len(all_author_data), "author objects")
st.write("Debug - Best author data keys:", list(best_author.keys()))
st.write("Debug - Best author data sample:", {
k: v for k, v in best_author.items()
if k in ['name', 'userName', 'followers', 'following', 'statusesCount']
})
return self._standardize_account_details(best_author)
def _find_most_complete_author(self, all_author_data: List[Dict]) -> Dict:
"""Find the author data object with the most complete information."""
best_author = {}
best_score = -1
for author in all_author_data:
score = self._calculate_author_completeness_score(author)
if score > best_score:
best_score = score
best_author = author
return best_author if best_score > 0 else (all_author_data[0] if all_author_data else {})
def _calculate_author_completeness_score(self, author: Dict) -> int:
"""Calculate completeness score for author data."""
score = 0
# Check for follower metrics (high priority)
followers = (author.get("followers") or author.get("followersCount") or
author.get("followers_count") or
author.get("publicMetrics", {}).get("followers_count") or
safe_get_nested(author, ["publicMetrics", "followers_count"]) or
safe_get_nested(author, ["public_metrics", "followers_count"]) or 0)
if followers > 0:
score += 3
following = (author.get("following") or author.get("followingCount") or
author.get("following_count") or author.get("friends_count") or
author.get("publicMetrics", {}).get("following_count") or
safe_get_nested(author, ["publicMetrics", "following_count"]) or
safe_get_nested(author, ["public_metrics", "following_count"]) or 0)
if following > 0:
score += 2
tweet_count = (author.get("statusesCount") or author.get("statuses_count") or
author.get("tweet_count") or
author.get("publicMetrics", {}).get("tweet_count") or
safe_get_nested(author, ["publicMetrics", "tweet_count"]) or
safe_get_nested(author, ["public_metrics", "tweet_count"]) or 0)
if tweet_count > 0:
score += 2
# Check for profile information (lower priority)
if author.get("description") or author.get("profile_bio"):
score += 1
if author.get("verified") or author.get("isVerified"):
score += 1
return score
def _convert_to_ist_format(self, twitter_date_str: str) -> str:
"""Convert Twitter date string to IST format."""
if not twitter_date_str or twitter_date_str == "":
return ""
try:
# Parse the Twitter date format: "Mon Jul 08 09:31:59 +0000 2013"
utc_dt = datetime.strptime(twitter_date_str, TWITTER_DATE_FORMAT)
# Convert to IST
ist_tz = pytz.timezone(IST_TIMEZONE)
ist_dt = utc_dt.astimezone(ist_tz)
# Format as a more readable IST date
# Format: "8 July 2013, 3:01 PM IST"
formatted_date = ist_dt.strftime("%d %B %Y, %I:%M %p IST")
return formatted_date
except ValueError:
# If parsing fails, return the original string
return twitter_date_str
def _standardize_account_details(self, author_data: Dict) -> Dict:
"""Standardize account details from various possible field names."""
# Debug: Print raw author data keys (only in debug mode)
if st.session_state.get('debug_mode', False):
st.write(f"Debug - Author data keys: {list(author_data.keys())}")
# Try multiple possible field names for metrics with additional variations
followers_count = (
author_data.get("followers") or
author_data.get("followersCount") or
author_data.get("followers_count") or
author_data.get("publicMetrics", {}).get("followers_count") or
safe_get_nested(author_data, ["publicMetrics", "followers_count"]) or
safe_get_nested(author_data, ["public_metrics", "followers_count"]) or
0
)
following_count = (
author_data.get("following") or
author_data.get("followingCount") or
author_data.get("following_count") or
author_data.get("friends_count") or
author_data.get("publicMetrics", {}).get("following_count") or
safe_get_nested(author_data, ["publicMetrics", "following_count"]) or
safe_get_nested(author_data, ["public_metrics", "following_count"]) or
0
)
tweet_count = (
author_data.get("statusesCount") or
author_data.get("statuses_count") or
author_data.get("tweet_count") or
author_data.get("publicMetrics", {}).get("tweet_count") or
safe_get_nested(author_data, ["publicMetrics", "tweet_count"]) or
safe_get_nested(author_data, ["public_metrics", "tweet_count"]) or
0
)
# Extract account creation date
raw_create_date = (
author_data.get("createdAt") or
author_data.get("created_at") or
author_data.get("account_create_date") or
""
)
# Convert to IST format if we have a valid date
account_create_date = self._convert_to_ist_format(raw_create_date)
return {
"name": author_data.get("name", ""),
"username": author_data.get("userName", "") or author_data.get("username", ""),
"bio": author_data.get("description", "") or author_data.get("bio", ""),
"followers_count": followers_count,
"following_count": following_count,
"tweet_count": tweet_count,
"verified": author_data.get("verified", False) or author_data.get("isVerified", False),
"profile_image_url": author_data.get("profileImageUrl", "") or author_data.get("profile_image_url", ""),
"account_create_date": account_create_date,
# Engagement metrics will be calculated from tweet data and added later
"likes_count": 0,
"views_count": 0,
"reply_count": 0,
"repost_count": 0,
}
# =============================================================================
# UI COMPONENTS
# =============================================================================
class UIComponents:
"""Reusable UI components for the dashboard."""
@staticmethod
def display_account_info(account_details: Dict) -> None:
"""Display account information section."""
if not account_details:
return
st.subheader(f"πŸ‘€ Account: @{account_details['username']}")
# Profile image
if account_details.get('profile_image_url'):
st.image(account_details['profile_image_url'], width=80)
# Account name and verification
verification_badge = 'βœ…' if account_details.get('verified') else ''
st.markdown(f"**{account_details.get('name')}** {verification_badge}")
# Bio
if account_details.get('bio'):
st.caption(account_details.get('bio'))
# Metrics
UIComponents._display_account_metrics(account_details)
st.divider()
@staticmethod
def _display_account_metrics(account_details: Dict) -> None:
"""Display account metrics (followers, following, posts)."""
# Account creation date
create_date = account_details.get('account_create_date', '')
if create_date:
st.caption(f"πŸ“… Account created: {create_date}")
# Basic metrics
m1, m2, m3 = st.columns(3)
followers = account_details.get('followers_count', 0)
following = account_details.get('following_count', 0)
posts = account_details.get('tweet_count', 0)
m1.metric(
"Followers",
format_large_number(followers),
help="Follower count from Twitter API"
)
m2.metric(
"Following",
format_large_number(following),
help="Following count from Twitter API"
)
m3.metric(
"Total Posts",
format_large_number(posts),
help="Total tweet count from Twitter API"
)
# Engagement metrics
likes = account_details.get('likes_count', 0)
views = account_details.get('views_count', 0)
replies = account_details.get('reply_count', 0)
reposts = account_details.get('repost_count', 0)
if likes > 0 or views > 0 or replies > 0 or reposts > 0:
st.caption("**πŸ“Š Total Engagement:**")
e1, e2, e3, e4 = st.columns(4)
e1.metric(
"Likes",
format_large_number(likes),
help="Total likes count"
)
e2.metric(
"Views",
format_large_number(views),
help="Total views/impressions count"
)
e3.metric(
"Replies",
format_large_number(replies),
help="Total replies count"
)
e4.metric(
"Reposts",
format_large_number(reposts),
help="Total reposts/retweets count"
)
# Advanced metrics sections
UIComponents._display_content_quality_metrics(account_details)
UIComponents._display_media_usage_metrics(account_details)
UIComponents._display_activity_patterns(account_details)
UIComponents._display_performance_metrics(account_details)
UIComponents._display_engagement_ratios(account_details)
# Warning for missing data
if followers == 0 and following == 0 and posts == 0:
st.warning("⚠️ Account metrics unavailable - this may be due to API limitations or account privacy settings")
@staticmethod
def _display_content_quality_metrics(account_details: Dict) -> None:
"""Display content quality metrics."""
avg_likes = account_details.get('avg_likes_per_tweet', 0)
avg_views = account_details.get('avg_views_per_tweet', 0)
engagement_rate = account_details.get('avg_engagement_rate', 0)
avg_length = account_details.get('avg_tweet_length', 0)
if avg_likes > 0 or avg_views > 0 or engagement_rate > 0:
st.caption("**πŸ“ˆ Content Quality:**")
q1, q2, q3, q4 = st.columns(4)
q1.metric(
"Avg Likes/Tweet",
f"{avg_likes:.1f}",
help="Average likes per tweet"
)
q2.metric(
"Avg Views/Tweet",
format_large_number(int(avg_views)),
help="Average views per tweet"
)
q3.metric(
"Engagement Rate",
f"{engagement_rate:.1f}%",
help="(Likes + Retweets) / Views * 100"
)
q4.metric(
"Avg Tweet Length",
f"{avg_length:.0f} chars",
help="Average character length per tweet"
)
@staticmethod
def _display_media_usage_metrics(account_details: Dict) -> None:
"""Display media usage metrics."""
media_count = account_details.get('tweets_with_media_count', 0)
media_percentage = account_details.get('media_usage_percentage', 0)
likes_with_media = account_details.get('avg_likes_with_media', 0)
likes_without_media = account_details.get('avg_likes_without_media', 0)
if media_count > 0 or media_percentage > 0:
st.caption("**🎬 Media Usage:**")
m1, m2, m3, m4 = st.columns(4)
m1.metric(
"Tweets with Media",
f"{media_count}",
help="Number of tweets with media attachments"
)
m2.metric(
"Media Usage",
f"{media_percentage:.1f}%",
help="Percentage of tweets with media"
)
m3.metric(
"Avg Likes (Media)",
f"{likes_with_media:.1f}",
help="Average likes for tweets with media"
)
m4.metric(
"Avg Likes (No Media)",
f"{likes_without_media:.1f}",
help="Average likes for tweets without media"
)
@staticmethod
def _display_activity_patterns(account_details: Dict) -> None:
"""Display activity pattern metrics."""
most_active_hour = account_details.get('most_active_hour', 0)
most_active_day = account_details.get('most_active_day', 'Unknown')
top_hours = account_details.get('top_activity_hours', [])
if most_active_hour > 0 or most_active_day != 'Unknown':
st.caption("**⏰ Activity Patterns:**")
a1, a2, a3, a4 = st.columns(4)
a1.metric(
"Most Active Hour",
f"{most_active_hour}:00",
help="Hour of day with most tweets"
)
a2.metric(
"Most Active Day",
most_active_day,
help="Day of week with most tweets"
)
a3.metric(
"Top Hours",
", ".join([f"{h}:00" for h in top_hours[:2]]),
help="Top active hours"
)
# Hashtag and mention usage
hashtags = account_details.get('total_hashtags_used', 0)
mentions = account_details.get('total_mentions_used', 0)
a4.metric(
"Hashtags Used",
f"{hashtags}",
help="Total hashtags used in tweets"
)
@staticmethod
def _display_performance_metrics(account_details: Dict) -> None:
"""Display performance metrics."""
highest_likes = account_details.get('highest_likes', 0)
viral_count = account_details.get('viral_tweets_count', 0)
viral_percentage = account_details.get('viral_content_percentage', 0)
top_tweet_text = account_details.get('top_tweet_text', '')
top_tweet_url = account_details.get('top_tweet_url', '')
if highest_likes > 0 or viral_count > 0:
st.caption("**πŸš€ Performance:**")
p1, p2, p3, p4 = st.columns(4)
p1.metric(
"Highest Likes",
format_large_number(highest_likes),
help="Most likes on a single tweet"
)
p2.metric(
"Viral Tweets",
f"{viral_count}",
help="Tweets in top 10% by likes"
)
p3.metric(
"Viral Content %",
f"{viral_percentage:.1f}%",
help="Percentage of viral tweets"
)
p4.metric(
"Engagement Score",
f"{account_details.get('engagement_score', 0):.1f}",
help="Weighted engagement score (likesΓ—1 + retweetsΓ—2 + repliesΓ—3)"
)
# Show top tweet if available
if top_tweet_text and top_tweet_url:
st.caption("**πŸ† Top Performing Tweet:**")
with st.expander("View top tweet"):
st.write(f"**Likes:** {format_large_number(highest_likes)}")
st.write(f"**Text:** {top_tweet_text}")
st.write(f"**URL:** {top_tweet_url}")
@staticmethod
def _display_engagement_ratios(account_details: Dict) -> None:
"""Display engagement ratio metrics."""
like_to_view = account_details.get('like_to_view_ratio', 0)
retweet_to_like = account_details.get('retweet_to_like_ratio', 0)
reply_to_like = account_details.get('reply_to_like_ratio', 0)
total_engagement = account_details.get('total_engagement', 0)
if like_to_view > 0 or retweet_to_like > 0 or reply_to_like > 0:
st.caption("**πŸ“Š Engagement Ratios:**")
r1, r2, r3, r4 = st.columns(4)
r1.metric(
"Like Rate",
f"{like_to_view:.2f}%",
help="Likes per view percentage"
)
r2.metric(
"Retweet Rate",
f"{retweet_to_like:.2f}%",
help="Retweets per like percentage"
)
r3.metric(
"Reply Rate",
f"{reply_to_like:.2f}%",
help="Replies per like percentage"
)
r4.metric(
"Total Engagement",
format_large_number(total_engagement),
help="Total likes + retweets + replies"
)
@staticmethod
def display_key_metrics(df: pd.DataFrame) -> None:
"""Display key engagement metrics."""
if df.empty:
return
st.subheader("πŸ“ˆ Key Metrics")
# Basic metrics
c1, c2, c3 = st.columns(3)
c1.metric("Total Tweets Scanned", f"{len(df):,}")
c2.metric("Total Likes", f"{df['Likes'].sum():,}")
c3.metric("Total Retweets", f"{df['Retweets'].sum():,}")
# Engagement metrics
st.subheader("⚑ Engagement")
df_copy = df.copy()
df_copy["Engagement"] = df_copy["Likes"] + df_copy["Retweets"] + df_copy["Replies"]
total_engagement = df_copy["Engagement"].sum()
avg_engagement = total_engagement / len(df) if len(df) > 0 else 0
total_views = df["Views"].sum()
engagement_rate = (total_engagement / total_views * 100) if total_views > 0 else 0
e1, e2 = st.columns(2)
e1.metric("Avg. Engagement/Tweet", f"{avg_engagement:.1f}")
e2.metric("Engagement Rate (vs Views)", f"{engagement_rate:.2f}%")
st.divider()
@staticmethod
def display_content_analysis(metrics: Dict) -> None:
"""Display content analysis section."""
st.subheader("πŸ” Content Analysis")
top_hashtags = metrics.get("top_hashtags", [])
top_mentions = metrics.get("top_mentions", [])
if top_hashtags:
st.markdown("**Top Hashtags**")
st.write(", ".join([f"`#{tag}` ({count})" for tag, count in top_hashtags]))
if top_mentions:
st.markdown("**Top Mentions**")
st.write(", ".join([f"`@{user}` ({count})" for user, count in top_mentions]))
@staticmethod
def display_ai_summary(gemini_summary: Optional[str]) -> None:
"""Display AI-generated summary section."""
if gemini_summary:
st.subheader("🧠 AI Summary & Recommendations")
st.markdown(gemini_summary)
st.divider()
@staticmethod
def display_most_engaging_tweet(df: pd.DataFrame) -> None:
"""Display the most engaging tweet."""
if df.empty:
return
st.subheader("🌟 Most Engaging Tweet")
df_copy = df.copy()
df_copy["Engagement"] = df_copy["Likes"] + df_copy["Retweets"] + df_copy["Replies"]
most_engaging = df_copy.loc[df_copy["Engagement"].idxmax()]
with st.container(border=True):
st.markdown(f"**{most_engaging['Text']}**")
stats = (f"❀️ {most_engaging['Likes']} | πŸ”„ {most_engaging['Retweets']} | "
f"πŸ’¬ {most_engaging['Replies']} | πŸ‘οΈ {most_engaging['Views']}")
st.markdown(f"**{stats}** | [{most_engaging['Date']}]({most_engaging['URL']})")
st.divider()
@staticmethod
def display_charts(df: pd.DataFrame) -> None:
"""Display data visualization charts."""
if df.empty:
return
st.subheader("πŸ“… Posting Patterns")
# Tweets by day
df_by_day = df.groupby('Date_Only')['Text'].count().reset_index()
df_by_day['Date_Only'] = pd.to_datetime(df_by_day['Date_Only'])
fig_day = px.line(
df_by_day,
x='Date_Only',
y='Text',
title="Tweets per Day",
labels={'Date_Only': 'Date', 'Text': 'Count'}
)
st.plotly_chart(fig_day, use_container_width=True)
@staticmethod
def display_data_download(df: pd.DataFrame) -> None:
"""Display raw data table with download option."""
st.subheader("πŸ“Š Raw Data")
st.dataframe(df)
if not df.empty:
csv = df.to_csv(index=False).encode('utf-8')
st.download_button(
"πŸ“₯ Download as CSV",
csv,
f"twitter_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
"text/csv",
key="download-csv",
use_container_width=True
)
# =============================================================================
# DASHBOARD MANAGEMENT
# =============================================================================
class TwitterDashboard:
"""Main dashboard for displaying Twitter analysis results."""
def __init__(self, df: pd.DataFrame, metrics: Dict, dataset_id: str,
analysis_type: str = "Account's Tweets", gemini_summary: Optional[str] = None):
self.df = df
self.metrics = metrics
self.dataset_id = dataset_id
self.analysis_type = analysis_type
self.gemini_summary = gemini_summary
def render(self) -> None:
"""Render the complete dashboard."""
if self.df.empty:
st.warning("No data available to display.")
return
# Main layout
left_col, right_col = st.columns([1, 1], gap="large")
with left_col:
# Only show account info for "Account's Tweets" analysis
if self.analysis_type == "Account's Tweets":
UIComponents.display_account_info(self.metrics.get("account_details", {}))
else:
# For "Comments to Account", show a different header
st.subheader(f"πŸ’¬ Comments Analysis")
st.info("Analyzing comments and replies directed to the account")
st.divider()
UIComponents.display_key_metrics(self.df)
UIComponents.display_content_analysis(self.metrics)
with right_col:
UIComponents.display_ai_summary(self.gemini_summary)
UIComponents.display_most_engaging_tweet(self.df)
UIComponents.display_charts(self.df)
# Full-width sections
UIComponents.display_data_download(self.df)
# =============================================================================
# SCHEDULER MANAGEMENT
# =============================================================================
class SchedulerManager:
"""Manages scheduled users and automation settings."""
def __init__(self, db: DatabaseManager):
self.db = db
def render_controls(self) -> None:
"""Render scheduler management interface."""
st.header("πŸ•’ Scheduler Management")
if not self.db.is_connected:
st.warning("⚠️ Database not connected. Scheduler features unavailable.")
return
self._display_current_users()
st.divider()
self._display_add_user_form()
st.divider()
self._display_scheduler_info()
def _display_current_users(self) -> None:
"""Display currently scheduled users."""
st.subheader("πŸ“‹ Current Scheduled Users")
try:
scheduled_users = list(self.db.scheduler_users_collection.find({"active": True}))
usernames = [user["username"] for user in scheduled_users]
except Exception as e:
st.error(f"Error fetching scheduled users: {e}")
return
if usernames:
for username in usernames:
col1, col2 = st.columns([3, 1])
with col1:
st.write(f"@{username}")
with col2:
if st.button("πŸ—‘οΈ", key=f"remove_{username}", help=f"Remove @{username}"):
if self._remove_user(username):
st.rerun()
else:
st.info("No users currently scheduled.")
def _display_add_user_form(self) -> None:
"""Display form to add new users."""
st.subheader("βž• Add New User")
new_username = st.text_input("Username to schedule (without @)", key="new_scheduled_user")
col1, col2 = st.columns(2)
with col1:
if st.button("Add User", use_container_width=True, disabled=not new_username):
if self._add_user(new_username):
st.success(f"βœ… Added @{new_username} to scheduler")
st.rerun()
with col2:
if st.button("πŸ”„ Refresh List", use_container_width=True):
st.rerun()
def _display_scheduler_info(self) -> None:
"""Display scheduler information."""
st.subheader("ℹ️ Scheduler Info")
st.info("""
**GitHub Actions Automation:**
- Runs daily at 12:00 AM IST automatically
- Can be triggered manually from GitHub Actions tab
- Scrapes only the previous day's data (no overlap)
- Stores results in MongoDB with duplicate detection
""")
def _add_user(self, username: str) -> bool:
"""Add user to scheduled scraping list."""
try:
# Check if user already exists
existing_users = list(self.db.scheduler_users_collection.find({"active": True}))
if username in [user["username"] for user in existing_users]:
st.warning("User already scheduled")
return False
user_doc = {
"username": username,
"active": True,
"added_at": datetime.utcnow(),
"last_scraped": None
}
self.db.scheduler_users_collection.update_one(
{"username": username},
{"$set": user_doc},
upsert=True
)
return True
except Exception as e:
st.error(f"Error adding user: {e}")
return False
def _remove_user(self, username: str) -> bool:
"""Remove user from scheduled scraping list."""
try:
self.db.scheduler_users_collection.update_one(
{"username": username},
{"$set": {"active": False}}
)
return True
except Exception as e:
st.error(f"Error removing user: {e}")
return False
# =============================================================================
# MAIN APPLICATION
# =============================================================================
class TwitterAnalyzerApp:
"""Main Twitter Analyzer application."""
def __init__(self):
self._setup_page()
self._initialize_services()
def _setup_page(self) -> None:
"""Configure Streamlit page settings."""
st.set_page_config(**PAGE_CONFIG)
st.title("🐦 Twitter Content Analyzer")
def _initialize_services(self) -> None:
"""Initialize all required services."""
try:
self.config = AppConfig()
self.db = DatabaseManager(self.config.mongodb_uri)
self.apify = ApifyService(self.config.apify_api_key)
self.gemini = GeminiService(self.config.gemini_api_key) if self.config.gemini_api_key else None
self.processor = TweetDataProcessor()
self.scheduler = SchedulerManager(self.db)
except ValueError as e:
st.error(f"Initialization failed: {e}. Please check your .env.local file.")
st.stop()
def run(self) -> None:
"""Execute the main application."""
self._render_sidebar()
if not hasattr(self, 'run_button') or not self.run_button or not self.username:
st.info("Please enter a Twitter username and click 'Analyze' to begin.")
return
self._perform_analysis()
def _render_sidebar(self) -> None:
"""Render the application sidebar."""
with st.sidebar:
self._render_analysis_controls()
self._render_debug_options()
st.divider()
self.scheduler.render_controls()
def _render_analysis_controls(self) -> None:
"""Render analysis control widgets."""
st.header("βš™οΈ Analysis Controls")
self.analysis_type = st.radio(
"Analysis Type",
["Account's Tweets", "Comments to Account"],
horizontal=True
)
self.username = st.text_input("Twitter Username (without @)", DEFAULT_USERNAME)
# Date inputs
today = datetime.now()
last_week = today - timedelta(days=DEFAULT_DAYS_BACK)
self.since_date = st.date_input("Start Date", last_week)
self.until_date = st.date_input("End Date", today)
self.run_button = st.button("πŸš€ Analyze", use_container_width=True, type="primary")
def _render_debug_options(self) -> None:
"""Render debug options."""
with st.expander("πŸ”§ Debug Options"):
st.session_state['debug_mode'] = st.checkbox(
"Show API Debug Info",
help="Shows raw API data for troubleshooting"
)
def _perform_analysis(self) -> None:
"""Perform the main analysis workflow."""
since_str = self.since_date.strftime("%Y-%m-%d")
until_str = self.until_date.strftime("%Y-%m-%d")
# Fetch data based on analysis type
try:
if self.analysis_type == "Account's Tweets":
raw_data, dataset_id = self.apify.fetch_account_tweets(self.username, since_str, until_str)
context = f"This is an analysis of tweets by the Twitter account @{self.username}."
else:
raw_data, dataset_id = self.apify.fetch_account_comments(self.username, since_str, until_str)
context = f"This is an analysis of comments/replies sent to the Twitter account @{self.username}."
if not raw_data:
st.error("No data was returned from the API. The account may be private, have no tweets in the selected range, or there might be an API issue.")
return
# Process data
df, metrics = self.processor.process_tweets(raw_data, self.username)
# Generate AI summary if available
gemini_summary = None
if self.gemini:
gemini_summary = self.gemini.generate_analysis(df.head(100), context)
else:
st.warning("GEMINI_API_KEY not found. AI summary will be skipped.")
# Display results
dashboard = TwitterDashboard(df, metrics, dataset_id, self.analysis_type, gemini_summary)
dashboard.render()
except Exception as e:
logger.error(f"Analysis failed: {e}")
st.error(f"Analysis failed: {str(e)}")
# =============================================================================
# APPLICATION ENTRY POINT
# =============================================================================
def main():
"""Application entry point."""
app = TwitterAnalyzerApp()
app.run()
if __name__ == "__main__":
main()