LinkedinMonitor / linkedin_follower_stats.py
GuglielmoTor's picture
Update linkedin_follower_stats.py
45a6b5d verified
import json
import requests
import logging
from datetime import datetime, timezone, timedelta
from urllib.parse import quote
# Assuming you have a sessions.py with create_session
# If sessions.py or create_session is not found, it will raise an ImportError,
# which is appropriate for a module that depends on it.
from sessions import create_session
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest"
LINKEDIN_API_VERSION = "202502" # As per user's example for follower stats
# --- ID to Name Mapping Helper Functions ---
def _fetch_linkedin_names(session, url, params, result_key_path, name_key_path, id_key="id"):
"""
Generic helper to fetch and map IDs to names from a LinkedIn API endpoint.
result_key_path: list of keys to navigate to the list of items (e.g., ["elements"])
name_key_path: list of keys to navigate to the name within an item (e.g., ["name", "localized", "en_US"])
"""
mapping = {}
try:
logging.debug(f"Fetching names from URL: {url} with params: {json.dumps(params)}") # Log params for clarity
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
items = data
for key in result_key_path:
if isinstance(items, dict):
items = items.get(key, [])
else:
logging.warning(f"Expected dict to get key '{key}' but got {type(items)} at path {result_key_path} for URL {url}. Check result_key_path.")
return mapping
if isinstance(items, dict):
for item_id_str, item_data in items.items():
name = item_data
for key_nav in name_key_path:
if isinstance(name, dict):
name = name.get(key_nav)
else:
name = None
break
if name:
mapping[item_id_str] = name
else:
logging.warning(f"No name found for ID {item_id_str} at path {name_key_path} in item: {item_data} from URL {url}")
elif isinstance(items, list):
for item in items:
item_id_val = item.get(id_key)
name = item
for key_nav in name_key_path:
if isinstance(name, dict):
name = name.get(key_nav)
else:
name = None
break
if item_id_val is not None and name:
mapping[str(item_id_val)] = name
else:
logging.warning(f"No ID ('{id_key}') or name found at path {name_key_path} in item: {item} from URL {url}")
else:
logging.warning(f"Expected list or dict of items at {result_key_path} from URL {url}, got {type(items)}")
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
error_text = getattr(e.response, 'text', str(e)) # Log the raw error text
logging.error(f"Error fetching names from {url} (Status: {status_code}): {error_text}")
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON for names from {url}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
except Exception as e:
logging.error(f"Unexpected error fetching names from {url}: {e}", exc_info=True)
return mapping
def get_functions_map(session):
"""Fetches all LinkedIn functions and returns a map of {id: name}."""
url = f"{API_V2_BASE}/functions"
params = {} # Relies on Accept-Language header from session
logging.info("Fetching all LinkedIn functions.")
return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
def get_seniorities_map(session):
"""Fetches all LinkedIn seniorities and returns a map of {id: name}."""
url = f"{API_V2_BASE}/seniorities"
params = {} # Relies on Accept-Language header from session
logging.info("Fetching all LinkedIn seniorities.")
return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
def get_industries_map(session, industry_urns, version="DEFAULT"):
"""Fetches names for a list of industry URNs by pulling ALL industries and filtering locally."""
# parse and dedupe IDs
industry_ids = [_parse_urn_to_id(urn) for urn in industry_urns or []]
unique_ids = set(filter(None, industry_ids))
if not unique_ids:
return {}
# we'll page through the full list; LinkedIn defaults to 10, so bump count
url = f"{API_V2_BASE}/industryTaxonomyVersions/{version}/industries"
params = {
'start': 0,
'count': 500 # should exceed total # of industries
}
logging.info(f"Fetching all industries (to filter {len(unique_ids)} IDs) from {url}")
try:
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
elements = data.get('elements', [])
mapping = {}
for el in elements:
el_id = el.get('id')
if el_id and str(el_id) in unique_ids:
# drill into name.localized.en_US
name = el.get('name', {}) \
.get('localized', {}) \
.get('en_US')
if name:
mapping[str(el_id)] = name
else:
logging.warning(f"Industry {el_id} has no en_US name field")
return mapping
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
logging.error(f"Error fetching all industries: {status_code}{getattr(e.response, 'text', str(e))}")
return {}
def get_geo_map(session, geo_urns):
"""Fetches names for a list of geo URNs. Returns a map {id: name}."""
if not geo_urns: return {}
geo_ids = [_parse_urn_to_id(urn) for urn in geo_urns if urn]
unique_ids = list(set(filter(None, geo_ids)))
if not unique_ids: return {}
# As per LinkedIn docs for BATCH_GET: ids=List(12345,23456)&locale=(language:en,country:US)
ids_param_string = "List(" + ",".join(map(str, unique_ids)) + ")"
locale_param_string = "(language:en,country:US)" # Must be exactly this string format
# Parameters must be passed in the URL string directly for this specific API format
# The `params` dict for session.get() will be empty.
url = f"{API_V2_BASE}/geo?ids={ids_param_string}&locale={locale_param_string}"
#url = f"{API_V2_BASE}/geo?ids=List({','.join(map(str, unique_ids))})&locale=(language:en,country:US)"
logging.info(f"Fetching names for {len(unique_ids)} unique geo IDs using URL: {url}")
return _fetch_linkedin_names(session, url, {}, ["results"], ["defaultLocalizedName", "value"])
def _parse_urn_to_id(urn_string):
"""Helper to get the last part (ID) from a URN string."""
if not isinstance(urn_string, str):
logging.debug(f"Invalid URN type: {type(urn_string)}, value: {urn_string}. Cannot parse ID.")
return None
try:
return urn_string.split(':')[-1]
except IndexError:
logging.warning(f"Could not parse ID from URN: {urn_string}")
return None
except Exception as e:
logging.error(f"Unexpected error parsing URN {urn_string}: {e}")
return None
# --- Follower Data Fetching Functions ---
def fetch_monthly_follower_gains(session, org_urn, api_rest_base):
"""
Fetches monthly follower gains for the last 12 full months.
The start date is set to the first day of the month, 12 months prior to the current month, at midnight UTC.
"""
# now = datetime.now()
# twelve_months_ago = now - timedelta(days=365)
# twelve_months_ago = twelve_months_ago.replace(day=1)
# start_date = int(twelve_months_ago.timestamp() * 1000)
# # Build URL with explicit query string
# url = (
# f"{api_rest_base}/organizationalEntityFollowerStatistics"
# f"?q=organizationalEntity"
# f"&organizationalEntity={org_urn}"
# f"&timeIntervals.timeGranularityType=MONTH"
# f"&timeIntervals.timeRange.start={start_date}"
# # LinkedIn defaults the end of the timeRange to the current time if not specified.
# )
# logging.info(f"Fetching monthly follower gains from URL: {url}")
session.headers.update({'LinkedIn-Version': "202502"})
# Replace with your LinkedIn organization URN
now = datetime.now()
# Subtract 12 months
twelve_months_ago = now - timedelta(days=365)
twelve_months_ago = twelve_months_ago.replace(day=1)
start_date1 = int(twelve_months_ago.timestamp() * 1000)
# Build the URL with time interval parameters
url = (
"https://api.linkedin.com/rest/organizationalEntityFollowerStatistics"
f"?q=organizationalEntity"
f"&organizationalEntity={org_urn}"
f"&timeIntervals.timeGranularityType=MONTH"
f"&timeIntervals.timeRange.start={start_date1}"
#f"&timeIntervals.timeRange.end={end_ms}"
)
results = []
try:
response = session.get(url)
response.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX)
data = response.json()
elements = data.get('elements', [])
if not elements:
logging.info(f"No 'elements' found in API response for {org_urn} for start_ms {start_ms}.")
for item in elements:
time_range = item.get('timeRange', {})
ts = time_range.get('start')
if ts is None:
logging.warning(f"Skipping item due to missing 'start' timestamp: {item}")
continue
# Convert timestamp (milliseconds) to YYYY-MM-DD date string in UTC
date_obj = datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
date_str = date_obj.strftime('%Y-%m-%d')
gains = item.get('followerGains', {})
# It's possible 'followerGains' itself is missing or None
if gains is None:
gains = {} # Ensure gains is a dict to prevent error on .get()
results.append({
'category_name': date_str, # This is the start date of the month's data
'follower_count_organic': gains.get('organicFollowerGain', 0),
'follower_count_paid': gains.get('paidFollowerGain', 0),
'follower_count_type': 'follower_gains_monthly',
'organization_urn': org_urn
})
logging.info(f"Fetched {len(results)} monthly follower entries for {org_urn} starting from {start_of_period.strftime('%Y-%m-%d')}.")
except requests.exceptions.HTTPError as http_err:
# More specific error for HTTP errors
code = getattr(http_err.response, 'status_code', 'N/A')
text = getattr(http_err.response, 'text', str(http_err))
logging.error(f"HTTP error fetching monthly gains for {org_urn}: {code} - {text}")
logging.error(f"Request URL: {url}")
except requests.exceptions.RequestException as e:
# Catch other request-related errors (e.g., connection issues)
code = getattr(e.response, 'status_code', 'N/A') if e.response is not None else 'N/A'
text = getattr(e.response, 'text', str(e)) if e.response is not None else str(e)
logging.error(f"Error fetching monthly gains for {org_urn}: {code} - {text}")
logging.error(f"Request URL: {url}")
except Exception as ex:
# Catch any other unexpected errors (e.g., JSON parsing if response is not JSON)
logging.error(f"An unexpected error occurred while fetching monthly gains for {org_urn}: {str(ex)}")
logging.error(f"Request URL: {url}")
return results
def fetch_follower_demographics(session, org_urn, functions_map, seniorities_map):
"""
Fetches current follower demographics, applying Top-N for specified categories.
"""
final_demographics_results = []
# Parameters for the main demographics call
params = {
'q': 'organizationalEntity',
'organizationalEntity': org_urn
}
url = f"{API_REST_BASE}/organizationalEntityFollowerStatistics"
logging.info(f"Fetching follower demographics from: {url} for org URN {org_urn} with params: {json.dumps(params)}")
try:
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
elements = data.get("elements", [])
if not elements:
logging.warning(f"No elements found in follower demographics response for {org_urn}.")
return []
stat_element = elements[0]
def _get_entries_for_type(raw_items_list, type_name, id_map, id_field_name_in_item, org_urn_val):
current_type_entries = []
if not raw_items_list:
logging.debug(f"No raw items for demographic type '{type_name}' for org {org_urn_val}.")
return current_type_entries
for item in raw_items_list:
# Default category name
category_name_val = "Unknown"
# For all types (since follower_association is removed), we parse URN and map to name
urn_val = item.get(id_field_name_in_item)
entity_id = _parse_urn_to_id(urn_val)
# Use a more descriptive unknown if ID mapping fails
unknown_label_suffix = type_name.split('_')[-1].capitalize() if '_' in type_name else type_name.capitalize()
category_name_val = id_map.get(str(entity_id), f"Unknown {unknown_label_suffix} (ID: {entity_id if entity_id else urn_val})")
counts = item.get("followerCounts", {})
organic_count = counts.get("organicFollowerCount", 0)
paid_count = counts.get("paidFollowerCount", 0)
current_type_entries.append({
"category_name": category_name_val,
"follower_count_organic": organic_count,
"follower_count_paid": paid_count,
"follower_count_type": type_name,
"organization_urn": org_urn_val
})
return current_type_entries
industry_urns_to_map = [item.get("industry") for item in stat_element.get("followerCountsByIndustry", []) if item.get("industry")]
geo_urns_to_map = [item.get("geo") for item in stat_element.get("followerCountsByGeoCountry", []) if item.get("geo")]
live_industries_map = get_industries_map(session, industry_urns_to_map)
live_geo_map = get_geo_map(session, geo_urns_to_map)
demographic_configs = [
{"items_key": "followerCountsBySeniority", "type_name": "follower_seniority", "id_map": seniorities_map, "id_field": "seniority", "top_n": 10},
{"items_key": "followerCountsByFunction", "type_name": "follower_function", "id_map": functions_map, "id_field": "function", "top_n": 10},
{"items_key": "followerCountsByIndustry", "type_name": "follower_industry", "id_map": live_industries_map, "id_field": "industry", "top_n": 10},
{"items_key": "followerCountsByGeoCountry", "type_name": "follower_geo", "id_map": live_geo_map, "id_field": "geo", "top_n": 10}
]
for config in demographic_configs:
raw_items = stat_element.get(config["items_key"], [])
processed_entries = _get_entries_for_type(raw_items, config["type_name"], config["id_map"], config["id_field"], org_urn)
if config["top_n"] is not None and processed_entries:
for entry in processed_entries:
if not isinstance(entry.get("follower_count_organic"), (int, float)):
entry["follower_count_organic"] = 0
sorted_entries = sorted(processed_entries, key=lambda x: x.get("follower_count_organic", 0), reverse=True)
final_demographics_results.extend(sorted_entries[:config["top_n"]])
logging.debug(f"Added top {config['top_n']} for {config['type_name']}. Count: {len(sorted_entries[:config['top_n']])}")
else:
final_demographics_results.extend(processed_entries)
logging.debug(f"Added all for {config['type_name']}. Count: {len(processed_entries)}")
logging.info(f"Processed follower demographics for {org_urn}. Total entries from all types: {len(final_demographics_results)}")
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
error_text = getattr(e.response, 'text', str(e))
logging.error(f"Error fetching follower demographics for {org_urn} (Status: {status_code}): {error_text}")
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON for follower demographics for {org_urn}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
except Exception as e:
logging.error(f"Unexpected error fetching follower demographics for {org_urn}: {e}", exc_info=True)
return final_demographics_results
# --- Main Orchestration Function ---
def get_linkedin_follower_stats(comm_client_id, community_token, org_urn):
"""
Main function to fetch all follower statistics (monthly gains and demographics)
and format them for Bubble.
"""
if not all([comm_client_id, community_token, org_urn]):
logging.error("Client ID, token, or Organization URN is missing for get_linkedin_follower_stats.")
return []
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
session = None
try:
session = create_session(comm_client_id, token=token_dict)
session.headers.update({
"X-Restli-Protocol-Version": "2.0.0",
"LinkedIn-Version": LINKEDIN_API_VERSION,
"Accept-Language": "en_US" # Explicitly set for v2 name lookups if not default in session
})
except Exception as e:
logging.error(f"Failed to create session or update headers for org {org_urn}: {e}", exc_info=True)
return []
logging.info(f"Starting follower stats retrieval for org: {org_urn}")
functions_map = get_functions_map(session)
seniorities_map = get_seniorities_map(session)
if not functions_map: logging.warning(f"Functions map is empty for org {org_urn}. Function names might not be resolved.")
if not seniorities_map: logging.warning(f"Seniorities map is empty for org {org_urn}. Seniority names might not be resolved.")
all_follower_data = []
demographics = fetch_follower_demographics(session, org_urn, functions_map, seniorities_map)
all_follower_data.extend(demographics)
session = create_session(comm_client_id, token=token_dict) #try a new session with base header because follower gains is not working
monthly_gains = fetch_monthly_follower_gains(session, org_urn, API_REST_BASE)
all_follower_data.extend(monthly_gains)
logging.info(f"Successfully compiled {len(all_follower_data)} total follower stat entries for {org_urn}.")
return all_follower_data