Spaces:
Running
Running
File size: 20,748 Bytes
9c8c059 07be99a f3b41b9 9c2556f 0089ba8 f3b41b9 a96ea35 9c2556f a96ea35 fe8ead0 9f1c65e 9c2556f a01c074 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
import json
import requests
from sessions import create_session
import html
from datetime import datetime, timezone, timedelta # Added timezone, timedelta
API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest"
def display_error(message, e=None):
"""Formats an error message for display in Gradio. Returns a gr.update object."""
error_prefix = "β Error: "
full_message = f"{error_prefix}{message}"
if e:
tb = traceback.format_exc()
print(f"--- ERROR ---")
print(f"Message: {message}")
print(f"Exception Type: {type(e)}")
print(f"Exception: {e}")
# Avoid printing traceback for simple Warnings like scope changes unless debugging deep
if not isinstance(e, Warning):
print(f"Traceback:\n{tb}")
print(f"-------------")
# Try to get more details from response if it's a requests error
if isinstance(e, requests.exceptions.RequestException) and e.response is not None:
try:
error_details = e.response.json()
details_str = json.dumps(error_details, indent=2)
full_message += f"\nStatus Code: {e.response.status_code}\nDetails:\n```json\n{details_str}\n```"
except json.JSONDecodeError:
full_message += f"\nStatus Code: {e.response.status_code}\nResponse Text:\n```\n{e.response.text}\n```"
elif hasattr(e, 'description'): # Handle OAuthLib errors which often have a description
full_message += f"\nDetails: {getattr(e, 'description', str(e))}"
else:
# Display the specific warning/error message directly
full_message += f"\nDetails: {str(e)}"
else:
print(f"Error: {message}") # Log simple message
# Use Markdown for better formatting in Gradio output
# Ensure it's wrapped in a way that Gradio Markdown understands as an error block if possible
# Simple red text might be best cross-platform
error_html = f"<p style='color: red; white-space: pre-wrap;'>{html.escape(full_message)}</p>"
return gr.update(value=error_html, visible=True)
def fetch_org_urn(comm_client_id, comm_token_dict):
"""
Fetches the user's administrated organization URN and name using the Marketing token.
Expects comm_token_dict to be the full token dictionary.
Raises ValueError on failure.
"""
print("--- Fetching Organization URN ---")
if not comm_token_dict or 'access_token' not in comm_token_dict:
print("ERROR: Invalid or missing Marketing token dictionary for fetching Org URN.")
raise ValueError("Marketing token is missing or invalid.")
ln_mkt = create_session(comm_client_id, token=comm_token_dict)
# Fetch organizational roles directly using the V2 API
url = (
f"{API_V2_BASE}/organizationalEntityAcls"
"?q=roleAssignee&role=ADMINISTRATOR&state=APPROVED" # Find orgs where user is ADMIN
"&projection=(elements*(*,organizationalTarget~(id,localizedName)))" # Get URN and name
)
print(f"Fetching Org URN details from: {url}")
try:
r = ln_mkt.get(url)
r.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
except requests.exceptions.RequestException as e:
print(f"ERROR: Failed to fetch organizationalEntityAcls with Marketing token.")
# Provide specific feedback based on status code if possible
status = e.response.status_code if e.response is not None else "N/A"
details = ""
if e.response is not None:
try:
details = f" Details: {e.response.json()}"
except json.JSONDecodeError:
details = f" Response: {e.response.text[:200]}..." # Show partial text
raise ValueError(f"Failed to fetch Organization details (Status: {status}). Check Marketing App permissions (r_organization_admin) and ensure the user is an admin of an org page.{details}") from e
data = r.json()
print(f"Org URN Response Data: {json.dumps(data, indent=2)}")
elements = data.get('elements')
if not elements:
print("WARNING: No organizations found where the user is an ADMINISTRATOR.")
# Try fetching with MEMBER role as a fallback? Might require different scope.
# For now, stick to ADMINISTRATOR as per scope.
raise ValueError("No organizations found for this user where they have the ADMINISTRATOR role. Ensure the Marketing App has 'r_organization_admin' permission and the user is an admin of an organization page.")
# Assuming the first organization found is the target
# In a real app, you might let the user choose if they admin multiple orgs.
org_element = elements[0]
# Extract Full URN ('organizationalTarget' field contains the URN string)
org_urn_full = org_element.get('organizationalTarget')
if not org_urn_full or not isinstance(org_urn_full, str) or not org_urn_full.startswith("urn:li:organization:"):
print(f"ERROR: Could not extract valid Organization URN ('organizationalTarget') from API response element: {org_element}")
raise ValueError("Could not extract a valid Organization URN from the API response.")
# Extract Name (from the projected 'organizationalTarget~' field)
org_name = None
# The key might be exactly 'organizationalTarget~' or something similar depending on projection syntax variations
org_target_details_key = next((k for k in org_element if k.endswith('organizationalTarget~')), None)
if org_target_details_key and isinstance(org_element.get(org_target_details_key), dict):
org_name = org_element[org_target_details_key].get('localizedName')
if not org_name:
# Fallback name using the ID part of the URN
org_id = org_urn_full.split(':')[-1]
org_name = f"Organization ({org_id})"
print(f"WARN: Could not find localizedName, using fallback: {org_name}")
print(f"Found Org: {org_name} ({org_urn_full})")
return org_urn_full, org_name
def fetch_posts_and_stats(comm_client_id, community_token, count=10):
"""Fetches posts using Marketing token and stats using Marketing token."""
print("--- Fetching Posts and Stats ---")
if not community_token:
print("WARN: Community token missing, but not currently used for post/stat fetching.")
raise ValueError("Community token is missing.") # Don't raise if not needed
# Ensure tokens are in the correct format (dict)
comm_token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'} # Process if needed later
ln_comm = create_session(comm_client_id, token=comm_token_dict) # Keep session available if needed
# 1) Get Org URN (using Marketing token)
#org_urn, org_name = fetch_org_urn(comm_token_dict) # Reuses the function
org_urn, org_name = "urn:li:organization:19010008", "GRLS"
# 2) Fetch latest posts (using Marketing Token via REST API)
# Endpoint requires r_organization_social permission
posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
print(f"Attempting to fetch posts from: {posts_url} using Marketing token")
try:
resp_posts = ln_comm.get(posts_url)
print(f"β POSTS Request Headers: {resp_posts.request.headers}")
print(f"β POSTS Response Status: {resp_posts.status_code}")
# Limit printing large response bodies
print(f"β POSTS Response Body (first 500 chars): {resp_posts.text[:500]}")
resp_posts.raise_for_status()
print("Fetched posts using Marketing token.")
except requests.exceptions.RequestException as e:
status = e.response.status_code if e.response is not None else "N/A"
details = ""
if e.response is not None:
try:
details = f" Details: {e.response.json()}"
except json.JSONDecodeError:
details = f" Response: {e.response.text[:200]}..."
print(f"ERROR: Fetching posts failed with Marketing token (Status: {status}).{details}")
raise ValueError(f"Failed to fetch posts using Marketing token (Status: {status}). Check permissions (r_organization_social).") from e
raw_posts_data = resp_posts.json()
raw_posts = raw_posts_data.get("elements", [])
print(f"Fetched {len(raw_posts)} raw posts.")
if not raw_posts:
return [], org_name # Return empty list and org name if no posts
# 3) Extract Post URNs (shares or ugcPosts)
post_urns = [p.get("id") for p in raw_posts if p.get("id") and (":share:" in p.get("id") or ":ugcPost:" in p.get("id"))]
if not post_urns:
print("WARN: No post URNs (share or ugcPost) found in the fetched posts.")
return [], org_name
print(f"Post URNs to fetch stats for: {post_urns}")
# 4) Fetch stats (using Comm session via REST API)
# Endpoint requires r_organization_social permission
stats_map = {}
batch_size = 20 # API likely has a limit on number of URNs per request
urn_batches = [post_urns[i:i + batch_size] for i in range(0, len(post_urns), batch_size)]
for batch in urn_batches:
if not batch: continue
stats_url = f"{API_REST_BASE}/organizationalEntityShareStatistics"
# Parameters need to be structured correctly: q=organizationalEntity, organizationalEntity=orgURN, shares[0]=shareURN1, ugcPosts[0]=ugcURN1 etc.
params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
share_idx, ugc_idx = 0, 0
for urn in batch:
if ':share:' in urn:
params[f'shares[{share_idx}]'] = urn
share_idx += 1
elif ':ugcPost:' in urn:
params[f'ugcPosts[{ugc_idx}]'] = urn
ugc_idx += 1
else:
print(f"WARN: Skipping unknown URN type for stats: {urn}")
if share_idx == 0 and ugc_idx == 0:
print("WARN: Skipping stats fetch for batch as no valid share/ugcPost URNs found.")
continue
print(f"Fetching stats for batch from: {stats_url} with {len(params)-2} URNs using Marketing token")
try:
resp_stats = ln_comm.get(stats_url, params=params)
print(f"β STATS Request URL: {resp_stats.request.url}") # Log the exact URL called
print(f"β STATS Request Headers: {resp_stats.request.headers}")
print(f"β STATS Response Status: {resp_stats.status_code}")
print(f"β STATS Response Body (first 500 chars): {resp_stats.text[:500]}")
resp_stats.raise_for_status()
stats_data = resp_stats.json().get("elements", [])
print(f"Received {len(stats_data)} stats elements for this batch.")
# Map stats back to their URNs
for elem in stats_data:
# Key in response is 'share' or 'ugcPost' containing the URN
urn_key = elem.get('share') or elem.get('ugcPost')
if urn_key:
# Store the whole 'totalShareStatistics' object
stats_map[urn_key] = elem.get('totalShareStatistics', {})
else:
print(f"WARN: Stats element missing 'share' or 'ugcPost' key: {elem}")
except requests.exceptions.RequestException as e:
status = e.response.status_code if e.response is not None else "N/A"
details = ""
if e.response is not None:
try:
details = f" Details: {e.response.json()}"
except json.JSONDecodeError:
details = f" Response: {e.response.text[:200]}..."
print(f"ERROR fetching stats batch using Marketing token (Status: {status}).{details}")
print("WARN: Skipping stats for this batch due to error.")
# Optionally raise an error here if stats are critical, or continue with partial data
# raise ValueError(f"Failed to fetch stats batch (Status: {status}).") from e
print(f"Fetched stats for {len(stats_map)} posts in total.")
# 5) Assemble combined post data
combined_posts = []
for post in raw_posts:
post_id = post.get("id")
if not post_id: continue
stats = stats_map.get(post_id, {}) # Get stats dict, default to empty if not found
published_ts = post.get("publishedAt")
created_ts = post.get("createdAt")
# Prefer publishedAt, fallback to createdAt
timestamp = published_ts or created_ts
when = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d %H:%M") if timestamp else "Unknown Date"
# --- Text Extraction Logic ---
text = ""
# Priority: REST API 'commentary' field seems most reliable for simple text posts
commentary_rest = post.get("commentary")
if commentary_rest:
text = commentary_rest
else:
# Fallback to V2 style fields if REST commentary is missing
# Check specificContent first (for shares with commentary)
specific_content = post.get("specificContent", {})
share_content = specific_content.get("com.linkedin.ugc.ShareContent", {})
share_commentary_v2 = share_content.get("shareCommentaryV2", {}).get("text")
if share_commentary_v2:
text = share_commentary_v2
else:
# Check top-level commentaryV2 (less common?)
commentary_v2 = post.get("commentaryV2", {}).get("text")
if commentary_v2:
text = commentary_v2
else:
# Check for article titles if it's an article share
article_content = specific_content.get("com.linkedin.ugc.ArticleContent", {})
article_title = article_content.get("title")
if article_title:
text = f"Article: {article_title}"
else:
# Check older 'content' field (might be deprecated)
content_text = post.get("content", {}).get("text", {}).get("text")
if content_text:
text = content_text
else:
# Final fallback
text = "[Media post or share without text]"
# Escape and truncate text for HTML display
display_text = html.escape(text[:250]).replace("\n", "<br>") + ("..." if len(text) > 250 else "")
# --- Stats Extraction ---
# Use .get with default 0 for robustness
impressions = stats.get("impressionCount", 0) or 0
likes = stats.get("likeCount", 0) or 0
comments = stats.get("commentCount", 0) or 0
clicks = stats.get("clickCount", 0) or 0
shares = stats.get("shareCount", 0) or 0
# Calculate engagement rate manually if 'engagement' field isn't present or reliable
engagement_num = likes + comments + clicks + shares # Sum of interactions
engagement_rate_manual = (engagement_num / impressions * 100) if impressions > 0 else 0.0
# Check if API provides 'engagement' field (usually rate as decimal)
engagement_api = stats.get('engagement')
if engagement_api is not None:
try:
# API provides rate as decimal (e.g., 0.02 for 2%)
engagement_str = f"{float(engagement_api) * 100:.2f}%"
except (ValueError, TypeError):
# Fallback to manual calculation if API value is invalid
engagement_str = f"{engagement_rate_manual:.2f}%"
else:
# Use manual calculation if API field is missing
engagement_str = f"{engagement_rate_manual:.2f}%"
combined_posts.append({
"id": post_id, "when": when, "text": display_text,
"likes": likes, "comments": comments, "impressions": impressions,
"clicks": clicks, "shares": shares, # Added shares to dict
"engagement": engagement_str,
})
return combined_posts, org_name
def render_post_cards(posts, org_name):
"""Generates HTML to display posts as cards."""
safe_org_name = html.escape(org_name) if org_name else "Your Organization"
if not posts:
return f"<h2 style='text-align: center; color: #555;'>No recent posts found for {safe_org_name}.</h2>"
cards_html = f"<h2 style='text-align: center; margin-bottom: 20px;'>Recent Posts for {safe_org_name}</h2><div style='display: flex; flex-wrap: wrap; gap: 15px; justify-content: center;'>"
for p in posts:
# Text is already escaped in fetch_posts_and_stats
cards_html += f"""
<div style="border: 1px solid #ccc; border-radius: 8px; padding: 15px; margin: 5px; width: 280px;
box-shadow: 2px 2px 5px rgba(0,0,0,0.1); background-color: #fff; display: flex;
flex-direction: column; justify-content: space-between; min-height: 220px; /* Adjusted min-height */">
<div>
<div style="font-size: 0.8em; color: #666; margin-bottom: 8px; border-bottom: 1px dashed #eee; padding-bottom: 5px;">{p['when']}</div>
<div style="font-size: 0.95em; margin-bottom: 12px; word-wrap: break-word; max-height: 120px; overflow-y: auto; padding-right: 5px;">{p['text']}</div>
</div>
<div style="font-size: 0.9em; color: #333; border-top: 1px solid #eee; padding-top: 10px; margin-top: auto; line-height: 1.6;">
<span title="Impressions">ποΈ {p.get('impressions', 0):,}</span> |
<span title="Likes">π {p.get('likes', 0):,}</span> |
<span title="Comments">π¬ {p.get('comments', 0):,}</span> |
<span title="Shares">π {p.get('shares', 0):,}</span> |
<span title="Clicks">π±οΈ {p.get('clicks', 0):,}</span><br>
<span title="Engagement Rate" style="font-weight: bold;">π {p.get('engagement', '0.00%')}</span>
</div>
</div>
"""
cards_html += "</div>"
return cards_html
def fetch_and_render_dashboard(comm_client_id, community_token):
"""Orchestrates fetching post data and rendering the dashboard."""
print("--- Rendering Dashboard ---")
if not comm_client_id: # Community token not strictly needed for this fetch anymore
print("ERROR: comm_client_id missing for dashboard rendering.")
return "<p style='color: red; text-align: center; font-weight: bold;'>β Error: Missing LinkedIn Marketing token. Please complete the login process on the 'Login' tab.</p>"
try:
print("Fetching posts and stats for dashboard...")
# Pass only the necessary token
posts_data, org_name = fetch_posts_and_stats(comm_client_id, community_token) # community_token kept for signature consistency
print(f"Rendering {len(posts_data)} posts for {org_name}.")
if not org_name:
org_name = "[Organization Name Not Found]" # Handle case where org name wasn't fetched
return render_post_cards(posts_data, org_name)
except ValueError as ve: # Catch specific errors like missing orgs or token issues
print(f"VALUE ERROR during dashboard fetch: {ve}")
# Use display_error to format the message for HTML/Markdown
error_update = display_error(f"Configuration or API Error: {ve}", ve)
return error_update.get('value', "<p style='color: red; text-align: center;'>β A configuration or API error occurred.</p>")
except requests.exceptions.RequestException as re:
print(f"HTTP ERROR during dashboard fetch: {re}")
status_code = re.response.status_code if re.response else "N/A"
error_update = display_error(f"API Request Failed (Status: {status_code}). Check permissions/scopes or API status.", re)
return error_update.get('value', f"<p style='color: red; text-align: center;'>β API Error: {status_code}. Check console logs.</p>")
except Exception as e:
print(f"UNEXPECTED ERROR during dashboard fetch: {e}")
error_update = display_error("Failed to fetch or render dashboard data.", e)
error_html = error_update.get('value', "<p style='color: red; text-align: center;'>β An unexpected error occurred. Check console logs.</p>")
# Ensure the error message is HTML-safe
if isinstance(error_html, str) and not error_html.strip().startswith("<"):
error_html = f"<pre style='color: red; white-space: pre-wrap;'>{html.escape(error_html)}</pre>"
return error_html
|