# -- coding: utf-8 -- import gradio as gr import json import os import logging import html import pandas as pd from datetime import datetime, timedelta, timezone # Added timezone # Import functions from your custom modules from analytics_fetch_and_rendering import fetch_and_render_analytics from gradio_utils import get_url_user_token from Bubble_API_Calls import ( fetch_linkedin_token_from_bubble, bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble # This will be used for posts, mentions, and follower stats ) from Linkedin_Data_API_Calls import ( fetch_linkedin_posts_core, fetch_comments, analyze_sentiment, # For post comments compile_detailed_posts, prepare_data_for_bubble, # For posts, stats, comments fetch_linkedin_mentions_core, analyze_mentions_sentiment, # For individual mentions compile_detailed_mentions, # Compiles to user-specified format prepare_mentions_for_bubble # Prepares user-specified format for Bubble ) # Import follower stats function from linkedin_follower_stats import get_linkedin_follower_stats # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # --- Global Constants --- DEFAULT_INITIAL_FETCH_COUNT = 10 LINKEDIN_POST_URN_KEY = 'id' BUBBLE_POST_URN_COLUMN_NAME = 'id' # Assuming this is the unique post ID in Bubble BUBBLE_POST_DATE_COLUMN_NAME = 'published_at' # Assuming this is the post publication date in Bubble # Constants for Mentions BUBBLE_MENTIONS_TABLE_NAME = "LI_mentions" BUBBLE_MENTIONS_ID_COLUMN_NAME = "id" # Assuming this is the unique mention ID in Bubble BUBBLE_MENTIONS_DATE_COLUMN_NAME = "date" # Assuming this is the mention date in Bubble DEFAULT_MENTIONS_INITIAL_FETCH_COUNT = 20 DEFAULT_MENTIONS_UPDATE_FETCH_COUNT = 10 # Constants for Follower Stats BUBBLE_FOLLOWER_STATS_TABLE_NAME = "LI_follower_stats" FOLLOWER_STATS_CATEGORY_COLUMN = "category_name" # For demographics: name (e.g., "Engineering"), for monthly gains: date string 'YYYY-MM-DD' FOLLOWER_STATS_TYPE_COLUMN = "follower_count_type" # e.g., "follower_seniority", "follower_gains_monthly" FOLLOWER_STATS_ORG_URN_COLUMN = "organization_urn" # URN of the organization FOLLOWER_STATS_ORGANIC_COLUMN = "follower_count_organic" FOLLOWER_STATS_PAID_COLUMN = "follower_count_paid" def check_token_status(token_state): """Checks the status of the LinkedIn token.""" return "â Token available" if token_state and token_state.get("token") else "â Token not available" def process_and_store_bubble_token(url_user_token, org_urn, token_state): """ Processes user token, fetches LinkedIn token, fetches existing Bubble data (posts, mentions, follower stats), and determines if an initial fetch or update is needed for each data type. Updates token state and UI for the sync button. """ logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") # Initialize or update state safely new_state = token_state.copy() if token_state else { "token": None, "client_id": None, "org_urn": None, "bubble_posts_df": pd.DataFrame(), "fetch_count_for_api": 0, "bubble_mentions_df": pd.DataFrame(), "bubble_follower_stats_df": pd.DataFrame(), "url_user_token_temp_storage": None } new_state.update({ "org_urn": org_urn, "bubble_posts_df": new_state.get("bubble_posts_df", pd.DataFrame()), # Ensure DF exists "fetch_count_for_api": new_state.get("fetch_count_for_api", 0), "bubble_mentions_df": new_state.get("bubble_mentions_df", pd.DataFrame()), # Ensure DF exists "bubble_follower_stats_df": new_state.get("bubble_follower_stats_df", pd.DataFrame()), # Ensure DF exists "url_user_token_temp_storage": url_user_token }) button_update = gr.update(visible=False, interactive=False, value="đ Sync LinkedIn Data") # Default to hidden client_id = os.environ.get("Linkedin_client_id") new_state["client_id"] = client_id if client_id else "ENV VAR MISSING" if not client_id: logging.error("CRITICAL ERROR: 'Linkedin_client_id' environment variable not set.") # Fetch LinkedIn Token from Bubble if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") try: parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: new_state["token"] = parsed_linkedin_token logging.info("â LinkedIn Token successfully fetched from Bubble.") else: new_state["token"] = None logging.warning(f"â Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}") except Exception as e: new_state["token"] = None logging.error(f"â Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True) else: new_state["token"] = None logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") # Fetch existing data from Bubble if Org URN is available current_org_urn = new_state.get("org_urn") if current_org_urn: # Fetch Posts from Bubble logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}") try: fetched_posts_df, error_message_posts = fetch_linkedin_posts_data_from_bubble(current_org_urn, "LI_posts") # Assuming "LI_posts" is the table name new_state["bubble_posts_df"] = pd.DataFrame() if error_message_posts or fetched_posts_df is None else fetched_posts_df if error_message_posts: logging.warning(f"Error fetching LI_posts from Bubble: {error_message_posts}.") except Exception as e: logging.error(f"â Error fetching posts from Bubble: {e}.", exc_info=True) new_state["bubble_posts_df"] = pd.DataFrame() # Fetch Mentions from Bubble logging.info(f"Attempting to fetch mentions from Bubble for org_urn: {current_org_urn}") try: fetched_mentions_df, error_message_mentions = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_MENTIONS_TABLE_NAME) new_state["bubble_mentions_df"] = pd.DataFrame() if error_message_mentions or fetched_mentions_df is None else fetched_mentions_df if error_message_mentions: logging.warning(f"Error fetching {BUBBLE_MENTIONS_TABLE_NAME} from Bubble: {error_message_mentions}.") except Exception as e: logging.error(f"â Error fetching mentions from Bubble: {e}.", exc_info=True) new_state["bubble_mentions_df"] = pd.DataFrame() # Fetch Follower Stats from Bubble logging.info(f"Attempting to fetch follower stats from Bubble for org_urn: {current_org_urn}") try: fetched_follower_stats_df, error_message_fs = fetch_linkedin_posts_data_from_bubble(current_org_urn, BUBBLE_FOLLOWER_STATS_TABLE_NAME) new_state["bubble_follower_stats_df"] = pd.DataFrame() if error_message_fs or fetched_follower_stats_df is None else fetched_follower_stats_df if error_message_fs: logging.warning(f"Error fetching {BUBBLE_FOLLOWER_STATS_TABLE_NAME} from Bubble: {error_message_fs}.") except Exception as e: logging.error(f"â Error fetching follower stats from Bubble: {e}.", exc_info=True) new_state["bubble_follower_stats_df"] = pd.DataFrame() else: logging.warning("Org URN not available in state. Cannot fetch data from Bubble.") new_state["bubble_posts_df"] = pd.DataFrame() new_state["bubble_mentions_df"] = pd.DataFrame() new_state["bubble_follower_stats_df"] = pd.DataFrame() # Determine fetch count for Posts API if new_state["bubble_posts_df"].empty: logging.info(f"âšī¸ No posts in Bubble. Setting to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT else: try: df_posts_check = new_state["bubble_posts_df"].copy() # Use .copy() if BUBBLE_POST_DATE_COLUMN_NAME not in df_posts_check.columns or df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME].isnull().all(): logging.warning(f"Date column '{BUBBLE_POST_DATE_COLUMN_NAME}' for posts missing or all null values. Triggering initial fetch.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT else: df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME] = pd.to_datetime(df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME], errors='coerce', utc=True) last_post_date_utc = df_posts_check[BUBBLE_POST_DATE_COLUMN_NAME].dropna().max() if pd.isna(last_post_date_utc): # No valid dates found after conversion logging.warning("No valid post dates found after conversion. Triggering initial fetch.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT else: days_diff = (pd.Timestamp('now', tz='UTC').normalize() - last_post_date_utc.normalize()).days if days_diff >= 7: # Fetch more if data is older, e.g., 10 posts per week of difference new_state['fetch_count_for_api'] = max(1, days_diff // 7) * 10 logging.info(f"Posts data is {days_diff} days old. Setting fetch count to {new_state['fetch_count_for_api']}.") else: new_state['fetch_count_for_api'] = 0 # Data is recent logging.info("Posts data is recent. No new posts fetch needed based on date.") except Exception as e: logging.error(f"Error processing post dates: {e}. Defaulting to initial fetch for posts.", exc_info=True) new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT # Determine if Mentions need sync mentions_need_sync = False if new_state["bubble_mentions_df"].empty: mentions_need_sync = True logging.info("Mentions need sync: Bubble mentions DF is empty.") else: # Check if the crucial date column exists and has any non-null values if BUBBLE_MENTIONS_DATE_COLUMN_NAME not in new_state["bubble_mentions_df"].columns or \ new_state["bubble_mentions_df"][BUBBLE_MENTIONS_DATE_COLUMN_NAME].isnull().all(): mentions_need_sync = True logging.info(f"Mentions need sync: Date column '{BUBBLE_MENTIONS_DATE_COLUMN_NAME}' missing or all null values.") else: df_mentions_check = new_state["bubble_mentions_df"].copy() # Use .copy() df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce', utc=True) last_mention_date_utc = df_mentions_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME].dropna().max() # Sync if no valid last mention date or if it's 7 days or older if pd.isna(last_mention_date_utc) or \ (pd.Timestamp('now', tz='UTC').normalize() - last_mention_date_utc.normalize()).days >= 7: mentions_need_sync = True logging.info(f"Mentions need sync: Last mention date {last_mention_date_utc} is old or invalid.") else: logging.info(f"Mentions up-to-date. Last mention: {last_mention_date_utc}") # Determine if Follower Stats need sync follower_stats_need_sync = False fs_df = new_state.get("bubble_follower_stats_df", pd.DataFrame()) if fs_df.empty: follower_stats_need_sync = True logging.info("Follower stats need sync: Bubble follower stats DF is empty.") else: # Check monthly gains data monthly_gains_df = fs_df[fs_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy() # Use .copy() if monthly_gains_df.empty: follower_stats_need_sync = True logging.info("Follower stats need sync: No monthly gains data in Bubble.") elif FOLLOWER_STATS_CATEGORY_COLUMN not in monthly_gains_df.columns: follower_stats_need_sync = True logging.info(f"Follower stats need sync: Date column '{FOLLOWER_STATS_CATEGORY_COLUMN}' missing in monthly gains.") else: # Ensure date conversion does not raise SettingWithCopyWarning by using .loc monthly_gains_df.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.normalize() last_gain_date = monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN].dropna().max() if pd.isna(last_gain_date): # No valid dates after conversion follower_stats_need_sync = True logging.info("Follower stats need sync: No valid dates in monthly gains after conversion.") else: # Sync if the last recorded gain is for a month *before* the start of the current month. # This ensures we attempt to fetch the previous month's data if it's not there. start_of_current_month = pd.Timestamp('now', tz='UTC').normalize().replace(day=1) if last_gain_date < start_of_current_month: follower_stats_need_sync = True logging.info(f"Follower stats need sync: Last gain date {last_gain_date} is before current month start {start_of_current_month}.") else: logging.info(f"Follower monthly gains up-to-date. Last gain recorded on: {last_gain_date}") # Also trigger sync if demographic data (non-monthly gains) is missing entirely # This is a basic check; more granular checks could be added for specific demographic types if needed. if fs_df[fs_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty: follower_stats_need_sync = True logging.info("Follower stats need sync: Demographic data (non-monthly types) missing.") # Update Sync Button based on token and needed actions sync_actions = [] if new_state['fetch_count_for_api'] > 0: sync_actions.append(f"{new_state['fetch_count_for_api']} Posts") if mentions_need_sync: sync_actions.append("Mentions") if follower_stats_need_sync: sync_actions.append("Follower Stats") if new_state["token"] and sync_actions: # Token present and actions needed button_label = f"đ Sync LinkedIn Data ({', '.join(sync_actions)})" button_update = gr.update(value=button_label, visible=True, interactive=True) elif new_state["token"]: # Token present but nothing to sync button_label = "â Data Up-to-Date" button_update = gr.update(value=button_label, visible=True, interactive=False) # Visible but not interactive else: # No token button_update = gr.update(visible=False, interactive=False) # Keep hidden token_status_message = check_token_status(new_state) logging.info(f"Token processing complete. Status: {token_status_message}. Button: {button_update}. Sync actions: {sync_actions}") return token_status_message, new_state, button_update def sync_linkedin_mentions(token_state): """Fetches new LinkedIn mentions and uploads them to Bubble.""" logging.info("Starting LinkedIn mentions sync process.") if not token_state or not token_state.get("token"): logging.error("Mentions sync: Access denied. No LinkedIn token.") return "Mentions: No token. ", token_state client_id = token_state.get("client_id") token_dict = token_state.get("token") org_urn = token_state.get('org_urn') bubble_mentions_df = token_state.get("bubble_mentions_df", pd.DataFrame()).copy() # Work with a copy if not org_urn or not client_id or client_id == "ENV VAR MISSING": logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).") return "Mentions: Config error. ", token_state # Determine if mentions sync is needed and how many to fetch fetch_count_for_mentions_api = 0 mentions_sync_is_needed_now = False if bubble_mentions_df.empty: mentions_sync_is_needed_now = True fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT logging.info("Mentions sync needed: Bubble DF empty. Fetching initial count.") else: if BUBBLE_MENTIONS_DATE_COLUMN_NAME not in bubble_mentions_df.columns or \ bubble_mentions_df[BUBBLE_MENTIONS_DATE_COLUMN_NAME].isnull().all(): mentions_sync_is_needed_now = True fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT logging.info(f"Mentions sync needed: Date column '{BUBBLE_MENTIONS_DATE_COLUMN_NAME}' missing or all null. Fetching initial count.") else: mentions_df_copy = bubble_mentions_df.copy() # Redundant copy, already copied above mentions_df_copy[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(mentions_df_copy[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce', utc=True) last_mention_date_utc = mentions_df_copy[BUBBLE_MENTIONS_DATE_COLUMN_NAME].dropna().max() if pd.isna(last_mention_date_utc) or \ (pd.Timestamp('now', tz='UTC').normalize() - last_mention_date_utc.normalize()).days >= 7: mentions_sync_is_needed_now = True fetch_count_for_mentions_api = DEFAULT_MENTIONS_UPDATE_FETCH_COUNT # Fetch update count if data is old logging.info(f"Mentions sync needed: Last mention date {last_mention_date_utc} is old or invalid. Fetching update count.") if not mentions_sync_is_needed_now: logging.info("Mentions data is fresh based on current check. No API fetch needed for mentions.") return "Mentions: Up-to-date. ", token_state logging.info(f"Mentions sync proceeding. Fetch count: {fetch_count_for_mentions_api}") try: processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api) if not processed_raw_mentions: logging.info("Mentions sync: No new mentions found via API.") return "Mentions: None found via API. ", token_state existing_mention_ids = set() if not bubble_mentions_df.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df.columns: # Ensure IDs are strings for reliable comparison, handling potential NaNs existing_mention_ids = set(bubble_mentions_df[BUBBLE_MENTIONS_ID_COLUMN_NAME].dropna().astype(str)) sentiments_map = analyze_mentions_sentiment(processed_raw_mentions) # Assumes this returns a map {mention_id: sentiment_data} all_compiled_mentions = compile_detailed_mentions(processed_raw_mentions, sentiments_map) # Assumes this adds sentiment to each mention dict # Filter out mentions already in Bubble new_compiled_mentions_to_upload = [ m for m in all_compiled_mentions if str(m.get("id")) not in existing_mention_ids ] if not new_compiled_mentions_to_upload: logging.info("Mentions sync: All fetched mentions are already in Bubble.") return "Mentions: All fetched already in Bubble. ", token_state bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload) # Prepare for Bubble format if bubble_ready_mentions: bulk_upload_to_bubble(bubble_ready_mentions, BUBBLE_MENTIONS_TABLE_NAME) logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.") # Update in-memory DataFrame updated_mentions_df = pd.concat([bubble_mentions_df, pd.DataFrame(bubble_ready_mentions)], ignore_index=True) # Drop duplicates based on ID, keeping the latest (which would be the newly added ones if IDs overlapped, though logic above should prevent this) token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last') return f"Mentions: Synced {len(bubble_ready_mentions)} new. ", token_state else: logging.info("Mentions sync: No new mentions were prepared for Bubble upload (possibly all filtered or empty after prep).") return "Mentions: No new ones to upload. ", token_state except ValueError as ve: # Catch specific errors if your API calls raise them logging.error(f"ValueError during mentions sync: {ve}", exc_info=True) return f"Mentions Error: {html.escape(str(ve))}. ", token_state except Exception as e: logging.exception("Unexpected error in sync_linkedin_mentions.") # Logs full traceback return f"Mentions: Unexpected error ({type(e).__name__}). ", token_state def sync_linkedin_follower_stats(token_state): """Fetches new LinkedIn follower statistics and uploads them to Bubble.""" logging.info("Starting LinkedIn follower stats sync process.") if not token_state or not token_state.get("token"): logging.error("Follower Stats sync: Access denied. No LinkedIn token.") return "Follower Stats: No token. ", token_state client_id = token_state.get("client_id") token_dict = token_state.get("token") org_urn = token_state.get('org_urn') if not org_urn or not client_id or client_id == "ENV VAR MISSING": logging.error("Follower Stats sync: Configuration error (Org URN or Client ID missing).") return "Follower Stats: Config error. ", token_state # Determine if follower stats sync is needed (logic copied and adapted from process_and_store_bubble_token) follower_stats_sync_is_needed_now = False fs_df_current = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy() # Work with a copy if fs_df_current.empty: follower_stats_sync_is_needed_now = True logging.info("Follower stats sync needed: Bubble DF is empty.") else: monthly_gains_df = fs_df_current[fs_df_current[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy() if monthly_gains_df.empty or FOLLOWER_STATS_CATEGORY_COLUMN not in monthly_gains_df.columns: follower_stats_sync_is_needed_now = True logging.info("Follower stats sync needed: Monthly gains data missing or date column absent.") else: monthly_gains_df.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.normalize() last_gain_date = monthly_gains_df[FOLLOWER_STATS_CATEGORY_COLUMN].dropna().max() start_of_current_month = pd.Timestamp('now', tz='UTC').normalize().replace(day=1) if pd.isna(last_gain_date) or last_gain_date < start_of_current_month: follower_stats_sync_is_needed_now = True logging.info(f"Follower stats sync needed: Last gain date {last_gain_date} is old or invalid.") if fs_df_current[fs_df_current[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty: follower_stats_sync_is_needed_now = True logging.info("Follower stats sync needed: Demographic data (non-monthly) is missing.") if not follower_stats_sync_is_needed_now: logging.info("Follower stats data is fresh based on current check. No API fetch needed.") return "Follower Stats: Data up-to-date. ", token_state logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}") try: # This function should return a list of dicts, each dict representing a stat entry api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn) if not api_follower_stats: # api_follower_stats could be None or empty list logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.") return "Follower Stats: None found via API. ", token_state bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy() new_stats_to_upload = [] # --- Process Monthly Gains --- api_monthly_gains = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) == 'follower_gains_monthly'] existing_monthly_gain_dates = set() if not bubble_follower_stats_df_orig.empty: bubble_monthly_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'] if FOLLOWER_STATS_CATEGORY_COLUMN in bubble_monthly_df.columns: # Ensure dates are strings for set comparison, handle potential NaNs from to_datetime if any existing_monthly_gain_dates = set(bubble_monthly_df[FOLLOWER_STATS_CATEGORY_COLUMN].astype(str).unique()) for gain_stat in api_monthly_gains: # category_name for monthly gains is 'YYYY-MM-DD' string from linkedin_follower_stats if str(gain_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN)) not in existing_monthly_gain_dates: new_stats_to_upload.append(gain_stat) # --- Process Demographics (add if new or different counts) --- api_demographics = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) != 'follower_gains_monthly'] # Create a map of existing demographics for quick lookup and comparison # Key: (org_urn, type, category_name) -> (organic_count, paid_count) existing_demographics_map = {} if not bubble_follower_stats_df_orig.empty: bubble_demographics_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'] if not bubble_demographics_df.empty and \ all(col in bubble_demographics_df.columns for col in [ FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN ]): for _, row in bubble_demographics_df.iterrows(): key = ( str(row[FOLLOWER_STATS_ORG_URN_COLUMN]), str(row[FOLLOWER_STATS_TYPE_COLUMN]), str(row[FOLLOWER_STATS_CATEGORY_COLUMN]) ) existing_demographics_map[key] = ( row[FOLLOWER_STATS_ORGANIC_COLUMN], row[FOLLOWER_STATS_PAID_COLUMN] ) for demo_stat in api_demographics: key = ( str(demo_stat.get(FOLLOWER_STATS_ORG_URN_COLUMN)), str(demo_stat.get(FOLLOWER_STATS_TYPE_COLUMN)), str(demo_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN)) ) api_counts = ( demo_stat.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0), demo_stat.get(FOLLOWER_STATS_PAID_COLUMN, 0) ) if key not in existing_demographics_map or existing_demographics_map[key] != api_counts: new_stats_to_upload.append(demo_stat) if not new_stats_to_upload: logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes found.") return "Follower Stats: Data up-to-date or no changes. ", token_state bulk_upload_to_bubble(new_stats_to_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME) logging.info(f"Successfully uploaded {len(new_stats_to_upload)} follower stat entries to Bubble for org {org_urn}.") # Update in-memory DataFrame: Concatenate old and new, then drop duplicates strategically temp_df = pd.concat([bubble_follower_stats_df_orig, pd.DataFrame(new_stats_to_upload)], ignore_index=True) # For monthly gains, unique by org, date (category_name) monthly_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].drop_duplicates( subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN], keep='last' # Keep the newest entry if dates somehow collide (shouldn't with current logic) ) # For demographics, unique by org, type, and category_name demographics_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].drop_duplicates( subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN], keep='last' # This ensures that if a demographic was "updated", the new version is kept ) token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True) return f"Follower Stats: Synced {len(new_stats_to_upload)} entries. ", token_state except ValueError as ve: # Catch specific errors if your API calls raise them logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True) return f"Follower Stats Error: {html.escape(str(ve))}. ", token_state except Exception as e: logging.exception(f"Unexpected error in sync_linkedin_follower_stats for {org_urn}.") # Logs full traceback return f"Follower Stats: Unexpected error ({type(e).__name__}). ", token_state def sync_all_linkedin_data(token_state): """Orchestrates the syncing of all LinkedIn data types (Posts, Mentions, Follower Stats).""" logging.info("Starting sync_all_linkedin_data process.") if not token_state or not token_state.get("token"): logging.error("Sync All: Access denied. LinkedIn token not available.") return "
â Access denied. LinkedIn token not available.
", token_state client_id = token_state.get("client_id") token_dict = token_state.get("token") org_urn = token_state.get('org_urn') fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0) # Operate on copies to avoid modifying original DFs in state directly until the end bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy() posts_sync_message = "" mentions_sync_message = "" follower_stats_sync_message = "" if not org_urn: logging.error("Sync All: Org URN missing in token_state.") return "â Config error: Org URN missing.
", token_state if not client_id or client_id == "ENV VAR MISSING": logging.error("Sync All: Client ID missing or not set.") return "â Config error: Client ID missing.
", token_state # --- Sync Posts --- if fetch_count_for_posts_api == 0: posts_sync_message = "Posts: Already up-to-date. " logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0.") else: logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.") try: # fetch_linkedin_posts_core is expected to return: (processed_raw_posts, stats_map, errors_list) processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api) if not processed_raw_posts: posts_sync_message = "Posts: None found via API. " logging.info("Posts sync: No raw posts returned from API.") else: existing_post_urns = set() if not bubble_posts_df_orig.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df_orig.columns: existing_post_urns = set(bubble_posts_df_orig[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str)) # Filter out posts already in Bubble new_raw_posts = [p for p in processed_raw_posts if str(p.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns] if not new_raw_posts: posts_sync_message = "Posts: All fetched already in Bubble. " logging.info("Posts sync: All fetched posts were already found in Bubble.") else: logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.") post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)] all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map) sentiments_per_post = analyze_sentiment(all_comments_data) # Assumes analysis of comments detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post) # Compiles with stats and sentiment # prepare_data_for_bubble should return tuple: (posts_for_bubble, post_stats_for_bubble, post_comments_for_bubble) li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data) if li_posts: # If there are posts to upload bulk_upload_to_bubble(li_posts, "LI_posts") # Update in-memory DataFrame for posts updated_posts_df = pd.concat([bubble_posts_df_orig, pd.DataFrame(li_posts)], ignore_index=True) token_state["bubble_posts_df"] = updated_posts_df.drop_duplicates(subset=[BUBBLE_POST_URN_COLUMN_NAME], keep='last') logging.info(f"Posts sync: Uploaded {len(li_posts)} new posts to Bubble.") if li_post_stats: bulk_upload_to_bubble(li_post_stats, "LI_post_stats") logging.info(f"Posts sync: Uploaded {len(li_post_stats)} post_stats entries.") # Note: Consider how/if to update a local stats_df in token_state if you maintain one. if li_post_comments: bulk_upload_to_bubble(li_post_comments, "LI_post_comments") logging.info(f"Posts sync: Uploaded {len(li_post_comments)} post_comments entries.") # Note: Consider how/if to update a local comments_df in token_state. posts_sync_message = f"Posts: Synced {len(li_posts)} new. " else: posts_sync_message = "Posts: No new ones to upload after processing. " logging.info("Posts sync: No new posts were prepared for Bubble upload.") except ValueError as ve: # Catch specific errors from your API calls posts_sync_message = f"Posts Error: {html.escape(str(ve))}. " logging.error(f"Posts sync: ValueError: {ve}", exc_info=True) except Exception as e: logging.exception("Posts sync: Unexpected error during processing.") # Logs full traceback posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). " # --- Sync Mentions --- # The sync_linkedin_mentions function updates token_state["bubble_mentions_df"] internally mentions_sync_message, token_state = sync_linkedin_mentions(token_state) # --- Sync Follower Stats --- # The sync_linkedin_follower_stats function updates token_state["bubble_follower_stats_df"] internally follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state) logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]") final_message = f"â Sync Attempted. {posts_sync_message} {mentions_sync_message} {follower_stats_sync_message}
" return final_message, token_state def display_main_dashboard(token_state): """Generates HTML for the main dashboard display using data from token_state.""" if not token_state or not token_state.get("token"): logging.warning("Dashboard display: Access denied. No token available.") return "â Access denied. No token available for dashboard." html_parts = ["No relevant post columns found to display.
") else: display_df_posts = posts_df.copy() if BUBBLE_POST_DATE_COLUMN_NAME in display_df_posts.columns: try: # Format date and sort display_df_posts[BUBBLE_POST_DATE_COLUMN_NAME] = pd.to_datetime(display_df_posts[BUBBLE_POST_DATE_COLUMN_NAME], errors='coerce').dt.strftime('%Y-%m-%d %H:%M') display_df_posts = display_df_posts.sort_values(by=BUBBLE_POST_DATE_COLUMN_NAME, ascending=False) except Exception as e: logging.error(f"Error formatting post dates for display: {e}") html_parts.append("Error formatting post dates.
") # Use escape=False if 'text' or 'summary_text' can contain HTML; otherwise, True is safer. # Assuming 'text' might have HTML from LinkedIn, using escape=False. Be cautious with this. html_parts.append(display_df_posts[cols_to_show_posts].head().to_html(escape=False, index=False, classes="table table-striped table-sm")) else: html_parts.append("No posts loaded from Bubble.
") html_parts.append("No relevant mention columns found to display.
") else: display_df_mentions = mentions_df.copy() if BUBBLE_MENTIONS_DATE_COLUMN_NAME in display_df_mentions.columns: try: display_df_mentions[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(display_df_mentions[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce').dt.strftime('%Y-%m-%d %H:%M') display_df_mentions = display_df_mentions.sort_values(by=BUBBLE_MENTIONS_DATE_COLUMN_NAME, ascending=False) except Exception as e: logging.error(f"Error formatting mention dates for display: {e}") html_parts.append("Error formatting mention dates.
") # Assuming "mention_text" can have HTML. html_parts.append(display_df_mentions[cols_to_show_mentions].head().to_html(escape=False, index=False, classes="table table-striped table-sm")) else: html_parts.append("No mentions loaded from Bubble.
") html_parts.append("No valid monthly follower gain data to display after processing.
") except Exception as e: logging.error(f"Error formatting follower gain dates for display: {e}") html_parts.append("Error displaying monthly follower gain data.
") else: html_parts.append("No monthly follower gain data or required columns are missing.
") # Count of Demographic Entries demographics_count = len(follower_stats_df[follower_stats_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly']) html_parts.append(f"Total demographic entries (seniority, industry, etc.): {demographics_count}
") else: html_parts.append("No follower statistics loaded from Bubble.
") html_parts.append("No mentions data in Bubble. Try syncing.
", None) html_parts = ["Error formatting mention dates.
") if not display_columns or mentions_df_display[display_columns].empty: # Check if display_df is empty after potential sort/filter html_parts.append("Required columns for mentions display are missing or no data after processing.
") else: # Assuming "mention_text" might contain HTML. html_parts.append(mentions_df_display[display_columns].head(20).to_html(escape=False, index=False, classes="table table-sm")) mentions_html_output = "\n".join(html_parts) fig = None # Initialize fig to None if not mentions_df.empty and "sentiment_label" in mentions_df.columns: try: import matplotlib.pyplot as plt plt.switch_backend('Agg') # Essential for Gradio fig_plot, ax = plt.subplots(figsize=(6,4)) # Create figure and axes sentiment_counts = mentions_df["sentiment_label"].value_counts() sentiment_counts.plot(kind='bar', ax=ax, color=['#4CAF50', '#FFC107', '#F44336', '#9E9E9E', '#2196F3']) # Example colors ax.set_title("Mention Sentiment Distribution") ax.set_ylabel("Count") plt.xticks(rotation=45, ha='right') plt.tight_layout() # Adjust layout to prevent labels from overlapping fig = fig_plot # Assign the figure to fig logging.info("Mentions tab: Sentiment distribution plot generated.") except Exception as e: logging.error(f"Error generating mentions plot: {e}", exc_info=True) fig = None # Ensure fig is None on error else: logging.info("Mentions tab: Not enough data or 'sentiment_label' column missing for plot.") return mentions_html_output, fig def run_follower_stats_tab_display(token_state): """Generates HTML and plots for the Follower Stats tab.""" logging.info("Updating Follower Stats Tab display.") if not token_state or not token_state.get("token"): logging.warning("Follower stats tab: Access denied. No token.") return ("â Access denied. No token available for follower stats.", None, None, None) follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()) if follower_stats_df_orig.empty: logging.info("Follower stats tab: No follower stats data in Bubble.") return ("No follower stats data in Bubble. Try syncing.
", None, None, None) follower_stats_df = follower_stats_df_orig.copy() # Work with a copy html_parts = ["Error displaying monthly follower gain data.
") else: html_parts.append("No monthly follower gain data available or required columns missing.
") html_parts.append("Error displaying follower seniority data.
") else: html_parts.append("No follower seniority data available or required columns missing.
") html_parts.append("Error displaying follower industry data.
") else: html_parts.append("No follower industry data available or required columns missing.
") html_parts.append("Sync status will appear here.
") dashboard_display_html = gr.HTML("Dashboard loading...
") # Chain of events for initial load: # 1. app.load gets URL params. # 2. org_urn_display.change triggers initial_load_sequence. # This populates token_state, updates sync button, and loads initial dashboard. org_urn_display.change( fn=initial_load_sequence, inputs=[url_user_token_display, org_urn_display, token_state], outputs=[status_box, token_state, sync_data_btn, dashboard_display_html], show_progress="full" ) # When Sync button is clicked: # 1. sync_all_linkedin_data: Fetches from LinkedIn, uploads to Bubble, updates token_state DFs. # 2. process_and_store_bubble_token: Re-evaluates sync needs (button should now say "Up-to-date"). # 3. display_main_dashboard: Refreshes dashboard with newly synced data. sync_data_btn.click( fn=sync_all_linkedin_data, inputs=[token_state], outputs=[sync_status_html_output, token_state], # token_state is updated here show_progress="full" ).then( fn=process_and_store_bubble_token, # Re-check sync status and update button inputs=[url_user_token_display, org_urn_display, token_state], # Pass current token_state outputs=[status_box, token_state, sync_data_btn], # token_state updated again show_progress=False ).then( fn=display_main_dashboard, # Refresh dashboard display inputs=[token_state], outputs=[dashboard_display_html], show_progress=False ) with gr.TabItem("2ī¸âŖ Analytics"): fetch_analytics_btn = gr.Button("đ Fetch/Refresh Full Analytics", variant="primary") # Analytics outputs follower_count_md = gr.Markdown("Analytics data will load here...") with gr.Row(): follower_plot, growth_plot = gr.Plot(label="Follower Demographics"), gr.Plot(label="Follower Growth") with gr.Row(): eng_rate_plot = gr.Plot(label="Engagement Rate") with gr.Row(): interaction_plot = gr.Plot(label="Post Interactions") with gr.Row(): eb_plot = gr.Plot(label="Engagement Benchmark") with gr.Row(): mentions_vol_plot, mentions_sentiment_plot = gr.Plot(label="Mentions Volume"), gr.Plot(label="Mentions Sentiment") fetch_analytics_btn.click( fn=guarded_fetch_analytics, inputs=[token_state], outputs=[follower_count_md, follower_plot, growth_plot, eng_rate_plot, interaction_plot, eb_plot, mentions_vol_plot, mentions_sentiment_plot], show_progress="full" ) with gr.TabItem("3ī¸âŖ Mentions"): refresh_mentions_display_btn = gr.Button("đ Refresh Mentions Display (from local data)", variant="secondary") mentions_html = gr.HTML("Mentions data loads from Bubble after sync. Click refresh to view current local data.") mentions_sentiment_dist_plot = gr.Plot(label="Mention Sentiment Distribution") refresh_mentions_display_btn.click( fn=run_mentions_tab_display, inputs=[token_state], outputs=[mentions_html, mentions_sentiment_dist_plot], show_progress="full" ) with gr.TabItem("4ī¸âŖ Follower Stats"): refresh_follower_stats_btn = gr.Button("đ Refresh Follower Stats Display (from local data)", variant="secondary") follower_stats_html = gr.HTML("Follower statistics load from Bubble after sync. Click refresh to view current local data.") with gr.Row(): fs_plot_monthly_gains = gr.Plot(label="Monthly Follower Gains") with gr.Row(): fs_plot_seniority = gr.Plot(label="Followers by Seniority (Top 10 Organic)") fs_plot_industry = gr.Plot(label="Followers by Industry (Top 10 Organic)") refresh_follower_stats_btn.click( fn=run_follower_stats_tab_display, inputs=[token_state], outputs=[follower_stats_html, fs_plot_monthly_gains, fs_plot_seniority, fs_plot_industry], show_progress="full" ) if __name__ == "__main__": # Check for essential environment variables if not os.environ.get("Linkedin_client_id"): logging.warning("WARNING: 'Linkedin_client_id' environment variable not set. The app may not function correctly for LinkedIn API calls.") if not os.environ.get("BUBBLE_APP_NAME") or \ not os.environ.get("BUBBLE_API_KEY_PRIVATE") or \ not os.environ.get("BUBBLE_API_ENDPOINT"): logging.warning("WARNING: One or more Bubble environment variables (BUBBLE_APP_NAME, BUBBLE_API_KEY_PRIVATE, BUBBLE_API_ENDPOINT) are not set. Bubble integration will fail.") try: import matplotlib logging.info(f"Matplotlib version: {matplotlib.__version__} found.") except ImportError: logging.error("Matplotlib is not installed. Plots will not be generated. Please install it: pip install matplotlib") # Launch the Gradio app app.launch(server_name="0.0.0.0", server_port=7860, debug=True) # Added debug=True for more verbose logging from Gradio