Spaces:
Running
Running
import json | |
import requests | |
from sessions import create_session | |
from datetime import datetime, timezone, timedelta # Added timezone, timedelta | |
import matplotlib.pyplot as plt # Added for plotting | |
from error_handling import display_error | |
import gradio as gr | |
import traceback | |
import html | |
API_V2_BASE = 'https://api.linkedin.com/v2' | |
API_REST_BASE = "https://api.linkedin.com/rest" | |
def extract_follower_gains(data): | |
"""Extracts monthly follower gains from API response.""" | |
results = [] | |
print(f"Raw gains data received for extraction: {json.dumps(data, indent=2)}") # Debug print | |
elements = data.get("elements", []) | |
if not elements: | |
print("Warning: No 'elements' found in follower statistics response.") | |
return [] | |
for item in elements: | |
time_range = item.get("timeRange", {}) | |
start_timestamp = time_range.get("start") | |
if start_timestamp is None: | |
print("Warning: Skipping item due to missing start timestamp.") | |
continue | |
# Convert timestamp to YYYY-MM format for clearer labeling | |
# Use UTC timezone explicitly | |
try: | |
date_obj = datetime.fromtimestamp(start_timestamp / 1000, tz=timezone.utc) | |
# Format as Year-Month (e.g., 2024-03) | |
date_str = date_obj.strftime('%Y-%m') | |
except Exception as time_e: | |
print(f"Warning: Could not parse timestamp {start_timestamp}. Error: {time_e}. Skipping item.") | |
continue | |
follower_gains = item.get("followerGains", {}) | |
# Handle potential None values from API by defaulting to 0 | |
organic_gain = follower_gains.get("organicFollowerGain", 0) or 0 | |
paid_gain = follower_gains.get("paidFollowerGain", 0) or 0 | |
results.append({ | |
"date": date_str, # Store simplified date string | |
"organic": organic_gain, | |
"paid": paid_gain | |
}) | |
print(f"Extracted follower gains (unsorted): {results}") # Debug print | |
# Sort results by date string to ensure chronological order for plotting | |
try: | |
results.sort(key=lambda x: x['date']) | |
except Exception as sort_e: | |
print(f"Warning: Could not sort follower gains by date. Error: {sort_e}") | |
print(f"Extracted follower gains (sorted): {results}") | |
return results | |
def fetch_analytics_data(comm_client_id, comm_token): | |
"""Fetches org URN, follower count, and follower gains using the Marketing token.""" | |
print("--- Fetching Analytics Data ---") | |
if not comm_token: | |
raise ValueError("comm_token is missing.") | |
token_dict = comm_token if isinstance(comm_token, dict) else {'access_token': comm_token, 'token_type': 'Bearer'} | |
ln_mkt = create_session(comm_client_id, token=token_dict) | |
try: | |
# 1. Fetch Org URN and Name | |
print("Fetching Org URN for analytics...") | |
# This function already handles errors and raises ValueError | |
#org_urn, org_name = fetch_org_urn(token_dict) | |
org_urn, org_name = "urn:li:organization:19010008", "GRLS" | |
print(f"Analytics using Org: {org_name} ({org_urn})") | |
# 2. Fetch Follower Count (v2 API) | |
# Endpoint requires r_organization_social permission | |
print("Fetching follower count...") | |
count_url = f"{API_V2_BASE}/networkSizes/{org_urn}?edgeType=CompanyFollowedByMember" | |
print(f"Requesting follower count from: {count_url}") | |
resp_count = ln_mkt.get(count_url) | |
print(f"→ COUNT Response Status: {resp_count.status_code}") | |
print(f"→ COUNT Response Body: {resp_count.text}") | |
resp_count.raise_for_status() # Check for HTTP errors | |
count_data = resp_count.json() | |
# The follower count is in 'firstDegreeSize' | |
follower_count = count_data.get("firstDegreeSize", 0) | |
print(f"Follower count: {follower_count}") | |
# 3. Fetch Follower Gains (REST API) | |
# Endpoint requires r_organization_social permission | |
print("Fetching follower gains...") | |
# Calculate start date: 12 months ago, beginning of that month, UTC | |
now = datetime.now(timezone.utc) | |
# Go back roughly 365 days | |
twelve_months_ago = now - timedelta(days=365) | |
# Set to the first day of that month | |
start_of_period = twelve_months_ago.replace(day=1, hour=0, minute=0, second=0, microsecond=0) | |
start_ms = int(start_of_period.timestamp() * 1000) | |
print(f"Requesting gains starting from: {start_of_period.strftime('%Y-%m-%d %H:%M:%S %Z')} ({start_ms} ms)") | |
gains_url = ( | |
f"{API_REST_BASE}/organizationalEntityFollowerStatistics" | |
f"?q=organizationalEntity" | |
f"&organizationalEntity={org_urn}" | |
f"&timeIntervals.timeGranularityType=MONTH" | |
f"&timeIntervals.timeRange.start={start_ms}" | |
# No end date needed to get data up to the latest available month | |
) | |
print(f"Requesting gains from: {gains_url}") | |
resp_gains = ln_mkt.get(gains_url) | |
print(f"→ GAINS Request Headers: {resp_gains.request.headers}") | |
print(f"→ GAINS Response Status: {resp_gains.status_code}") | |
print(f"→ GAINS Response Body (first 500 chars): {resp_gains.text[:500]}") | |
resp_gains.raise_for_status() # Check for HTTP errors | |
gains_data = resp_gains.json() | |
# 4. Process Gains Data using the extraction function | |
follower_gains_list = extract_follower_gains(gains_data) | |
# Return all fetched data | |
return org_name, follower_count, follower_gains_list | |
except requests.exceptions.RequestException as e: | |
status = e.response.status_code if e.response is not None else "N/A" | |
details = "" | |
if e.response is not None: | |
try: | |
details = f" Details: {e.response.json()}" | |
except json.JSONDecodeError: | |
details = f" Response: {e.response.text[:200]}..." | |
print(f"ERROR fetching analytics data (Status: {status}).{details}") | |
# Re-raise a user-friendly error, including the original exception context | |
raise ValueError(f"Failed to fetch analytics data from LinkedIn API (Status: {status}). Check permissions (r_organization_social) and API status.") from e | |
except ValueError as ve: | |
# Catch ValueErrors raised by fetch_org_urn | |
print(f"ERROR during analytics data fetch (likely Org URN): {ve}") | |
raise ve # Re-raise the specific error message | |
except Exception as e: | |
print(f"UNEXPECTED ERROR processing analytics data: {e}") | |
tb = traceback.format_exc() | |
print(tb) | |
raise ValueError(f"An unexpected error occurred while fetching or processing analytics data.") from e | |
def plot_follower_gains(follower_data): | |
"""Generates a matplotlib plot for follower gains. Returns the figure object.""" | |
print(f"Plotting follower gains data: {follower_data}") | |
plt.style.use('seaborn-v0_8-whitegrid') # Use a nice style | |
if not follower_data: | |
print("No follower data to plot.") | |
# Create an empty plot with a message | |
fig, ax = plt.subplots(figsize=(10, 5)) | |
ax.text(0.5, 0.5, 'No follower gains data available for the last 12 months.', | |
horizontalalignment='center', verticalalignment='center', | |
transform=ax.transAxes, fontsize=12, color='grey') | |
ax.set_title('Monthly Follower Gains (Last 12 Months)') | |
ax.set_xlabel('Month') | |
ax.set_ylabel('Follower Gains') | |
# Remove ticks if there's no data | |
ax.set_xticks([]) | |
ax.set_yticks([]) | |
plt.tight_layout() | |
return fig # Return the figure object | |
try: | |
# Ensure data is sorted by date (should be done in extract_follower_gains, but double-check) | |
follower_data.sort(key=lambda x: x['date']) | |
dates = [entry['date'] for entry in follower_data] # Should be 'YYYY-MM' strings | |
organic_gains = [entry['organic'] for entry in follower_data] | |
paid_gains = [entry['paid'] for entry in follower_data] | |
# Create the plot | |
fig, ax = plt.subplots(figsize=(12, 6)) # Use fig, ax pattern | |
ax.plot(dates, organic_gains, label='Organic Follower Gain', marker='o', linestyle='-', color='#0073b1') # LinkedIn blue | |
ax.plot(dates, paid_gains, label='Paid Follower Gain', marker='x', linestyle='--', color='#d9534f') # Reddish color | |
# Customize the plot | |
ax.set_xlabel('Month (YYYY-MM)') | |
ax.set_ylabel('Number of New Followers') | |
ax.set_title('Monthly Follower Gains (Last 12 Months)') | |
# Improve x-axis label readability | |
# Show fewer labels if there are many months | |
tick_frequency = max(1, len(dates) // 10) # Show label roughly every N months | |
ax.set_xticks(dates[::tick_frequency]) | |
ax.tick_params(axis='x', rotation=45, labelsize=9) # Rotate and adjust size | |
ax.legend(title="Gain Type") | |
ax.grid(True, linestyle='--', alpha=0.6) # Lighter grid | |
# Add value labels on top of bars/points (optional, can get crowded) | |
# for i, (org, paid) in enumerate(zip(organic_gains, paid_gains)): | |
# if org > 0: ax.text(i, org, f'{org}', ha='center', va='bottom', fontsize=8) | |
# if paid > 0: ax.text(i, paid, f'{paid}', ha='center', va='bottom', fontsize=8, color='red') | |
plt.tight_layout() # Adjust layout to prevent labels from overlapping | |
print("Successfully generated follower gains plot.") | |
# Return the figure object for Gradio | |
return fig | |
except Exception as plot_e: | |
print(f"ERROR generating follower gains plot: {plot_e}") | |
tb = traceback.format_exc() | |
print(tb) | |
# Return an empty plot with an error message if plotting fails | |
fig, ax = plt.subplots(figsize=(10, 5)) | |
ax.text(0.5, 0.5, f'Error generating plot: {plot_e}', | |
horizontalalignment='center', verticalalignment='center', | |
transform=ax.transAxes, fontsize=12, color='red', wrap=True) | |
ax.set_title('Follower Gains Plot Error') | |
plt.tight_layout() | |
return fig | |
def fetch_and_render_analytics(comm_client_id, comm_token): | |
"""Fetches analytics data and prepares updates for Gradio UI.""" | |
print("--- Rendering Analytics Tab ---") | |
# Initial state for outputs | |
count_output = gr.update(value="<p>Loading follower count...</p>", visible=True) | |
plot_output = gr.update(value=None, visible=False) # Hide plot initially | |
if not comm_token: | |
print("ERROR: Marketing token missing for analytics.") | |
error_msg = "<p style='color: red; text-align: center; font-weight: bold;'>❌ Error: Missing LinkedIn Marketing token. Please complete the login process first.</p>" | |
return gr.update(value=error_msg, visible=True), gr.update(value=None, visible=False) | |
try: | |
# Fetch all data together | |
org_name, follower_count, follower_gains_list = fetch_analytics_data(comm_client_id, comm_token) | |
# Format follower count display - Nicer HTML | |
count_display_html = f""" | |
<div style='text-align: center; padding: 20px; background-color: #e7f3ff; border: 1px solid #bce8f1; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);'> | |
<p style='font-size: 1.1em; color: #31708f; margin-bottom: 5px;'>Total Followers for</p> | |
<p style='font-size: 1.4em; font-weight: bold; color: #005a9e; margin-bottom: 10px;'>{html.escape(org_name)}</p> | |
<p style='font-size: 2.8em; font-weight: bold; color: #0073b1; margin: 0;'>{follower_count:,}</p> | |
<p style='font-size: 0.9em; color: #777; margin-top: 5px;'>(As of latest data available)</p> | |
</div> | |
""" | |
count_output = gr.update(value=count_display_html, visible=True) | |
# Generate plot | |
print("Generating follower gains plot...") | |
plot_fig = plot_follower_gains(follower_gains_list) | |
# If plot generation failed, plot_fig might contain an error message plot | |
plot_output = gr.update(value=plot_fig, visible=True) | |
print("Analytics data fetched and processed successfully.") | |
return count_output, plot_output | |
except (ValueError, requests.exceptions.RequestException) as api_ve: | |
# Catch specific API or configuration errors from fetch_analytics_data | |
print(f"API or VALUE ERROR during analytics fetch: {api_ve}") | |
error_update = display_error(f"Failed to load analytics: {api_ve}", api_ve) | |
# Show error in the count area, hide plot | |
return gr.update(value=error_update.get('value', "<p style='color: red;'>Error loading follower count.</p>"), visible=True), gr.update(value=None, visible=False) | |
except Exception as e: | |
# Catch any other unexpected errors during fetch or plotting | |
print(f"UNEXPECTED ERROR during analytics rendering: {e}") | |
tb = traceback.format_exc() | |
print(tb) | |
error_update = display_error("An unexpected error occurred while loading analytics.", e) | |
error_html = error_update.get('value', "<p style='color: red;'>An unexpected error occurred.</p>") | |
# Ensure the error message is HTML-safe | |
if isinstance(error_html, str) and not error_html.strip().startswith("<"): | |
error_html = f"<pre style='color: red; white-space: pre-wrap;'>{html.escape(error_html)}</pre>" | |
# Show error in the count area, hide plot | |
return gr.update(value=error_html, visible=True), gr.update(value=None, visible=False) |