Spaces:
Running
Running
import re | |
from io import BytesIO | |
import requests | |
import tweepy | |
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
LINKEDIN_CLIENT_ID = os.getenv("LINKEDIN_CLIENT_ID") | |
LINKEDIN_CLIENT_SECRET = os.getenv("LINKEDIN_CLIENT_SECRET") | |
TWITTER_CLIENT_ID = os.getenv("TWITTER_CLIENT_ID") | |
TWITTER_CLIENT_SECRET = os.getenv("TWITTER_CLIENT_SECRET") | |
def extract_image_url(entry): | |
"""Extract an image URL from an RSS feed entry.""" | |
for enclosure in entry.get('enclosures', []): | |
if enclosure.get('type', '').startswith('image/'): | |
return enclosure.get('url') | |
for media in entry.get('media_content', []): | |
if media.get('type', '').startswith('image/'): | |
return media.get('url') | |
for thumbnail in entry.get('media_thumbnail', []): | |
if thumbnail.get('url'): | |
return thumbnail.get('url') | |
if 'image' in entry: | |
image = entry['image'] | |
if isinstance(image, dict) and 'url' in image: | |
return image['url'] | |
elif isinstance(image, list): | |
for img in image: | |
if 'url' in img: | |
return img['url'] | |
if 'itunes_image' in entry: | |
return entry['itunes_image'].get('href') | |
for field in ['description', 'summary', 'content']: | |
if field in entry: | |
content = entry[field] | |
if isinstance(content, list): | |
content = content[0].get('value', '') | |
elif isinstance(content, dict): | |
content = content.get('value', '') | |
else: | |
content = str(content) | |
match = re.search(r'<img[^>]+src=["\'](.*?)["\']', content, re.I) | |
if match: | |
return match.group(1) | |
return None | |
def post_to_linkedin(post): | |
"""Post content to LinkedIn with optional image.""" | |
if post['status'] not in ['pending', 'posting']: | |
return | |
access_token = post['access_token'] | |
print("linkedin_access_token",access_token) | |
linkedin_id = post['linkedin_id'] | |
image_url = post.get('image_url') | |
headers = { | |
'Authorization': f'Bearer {access_token}', | |
'Content-Type': 'application/json', | |
} | |
if image_url: | |
response = requests.get(image_url, timeout=10) | |
if response.status_code == 200: | |
image_content = response.content | |
register_url = 'https://api.linkedin.com/v2/assets?action=registerUpload' | |
register_body = { | |
'registerUploadRequest': { | |
'recipes': ['urn:li:digitalmediaRecipe:feedshare-image'], | |
'owner': f'urn:li:person:{linkedin_id}', | |
'serviceRelationships': [ | |
{'relationshipType': 'OWNER', 'identifier': 'urn:li:userGeneratedContent'} | |
] | |
} | |
} | |
register_response = requests.post(register_url, headers=headers, json=register_body) | |
if register_response.status_code == 200: | |
upload_data = register_response.json()['value'] | |
upload_url = upload_data['uploadMechanism']['com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest']['uploadUrl'] | |
asset = upload_data['asset'] | |
upload_headers = {'Authorization': f'Bearer {access_token}'} | |
upload_response = requests.put(upload_url, headers=upload_headers, data=image_content) | |
if upload_response.status_code == 201: | |
api_url = 'https://api.linkedin.com/v2/ugcPosts' | |
post_body = { | |
'author': f'urn:li:person:{linkedin_id}', | |
'lifecycleState': 'PUBLISHED', | |
'specificContent': { | |
'com.linkedin.ugc.ShareContent': { | |
'shareCommentary': {'text': post['text']}, | |
'shareMediaCategory': 'IMAGE', | |
'media': [{'status': 'READY', 'media': asset}] | |
} | |
}, | |
'visibility': {'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC'} | |
} | |
response = requests.post(api_url, headers=headers, json=post_body) | |
post['status'] = 'posted' if response.status_code == 201 else 'failed' | |
print(f"LinkedIn post attempt: {response.status_code} - {response.text}") | |
else: | |
print(f"Image upload failed: {upload_response.status_code}") | |
else: | |
print(f"Upload registration failed: {register_response.status_code}") | |
else: | |
print(f"Image download failed: {response.status_code}") | |
if post['status'] != 'posted': | |
api_url = 'https://api.linkedin.com/v2/ugcPosts' | |
post_body = { | |
'author': f'urn:li:person:{linkedin_id}', | |
'lifecycleState': 'PUBLISHED', | |
'specificContent': { | |
'com.linkedin.ugc.ShareContent': { | |
'shareCommentary': {'text': post['text']}, | |
'shareMediaCategory': 'NONE' | |
} | |
}, | |
'visibility': {'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC'} | |
} | |
response = requests.post(api_url, headers=headers, json=post_body) | |
post['status'] = 'posted' if response.status_code == 201 else 'failed' | |
print(f"LinkedIn text-only post: {response.status_code} - {response.text}") | |
else: | |
api_url = 'https://api.linkedin.com/v2/ugcPosts' | |
post_body = { | |
'author': f'urn:li:person:{linkedin_id}', | |
'lifecycleState': 'PUBLISHED', | |
'specificContent': { | |
'com.linkedin.ugc.ShareContent': { | |
'shareCommentary': {'text': post['text']}, | |
'shareMediaCategory': 'NONE' | |
} | |
}, | |
'visibility': {'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC'} | |
} | |
response = requests.post(api_url, headers=headers, json=post_body) | |
post['status'] = 'posted' if response.status_code == 201 else 'failed' | |
print(f"LinkedIn post attempt: {response.status_code} - {response.text}") | |
def post_to_twitter(post): | |
"""Post content to Twitter with optional image.""" | |
if post['status'] not in ['pending', 'posting']: | |
return | |
client = tweepy.Client( | |
consumer_key=TWITTER_CLIENT_ID, | |
consumer_secret=TWITTER_CLIENT_SECRET, | |
access_token=post['access_token'], | |
access_token_secret=post['access_token_secret'] | |
) | |
print("access_token_secret",client.access_token_secret) | |
image_url = post.get('image_url') | |
if image_url: | |
response = requests.get(image_url, timeout=10) | |
if response.status_code == 200: | |
image_content = BytesIO(response.content) | |
try: | |
api = tweepy.API(tweepy.OAuth1UserHandler( | |
TWITTER_CLIENT_ID, TWITTER_CLIENT_SECRET, | |
post['access_token'], post['access_token_secret'] | |
)) | |
media = api.media_upload(filename='image', file=image_content) | |
client.create_tweet(text=post['text'], media_ids=[media.media_id]) | |
post['status'] = 'posted' | |
print("Twitter post with image successful") | |
except tweepy.TweepyException as e: | |
print(f"Twitter image post error: {e}") | |
try: | |
client.create_tweet(text=post['text']) | |
post['status'] = 'posted' | |
print("Twitter text-only post successful") | |
except tweepy.TweepyException as e: | |
post['status'] = 'failed' | |
print(f"Twitter text-only error: {e}") | |
except Exception as e: | |
print(f"Media upload error: {e}") | |
else: | |
print(f"Image download failed: {response.status_code}") | |
try: | |
client.create_tweet(text=post['text']) | |
post['status'] = 'posted' | |
print("Twitter text-only post successful") | |
except tweepy.TweepyException as e: | |
post['status'] = 'failed' | |
print(f"Twitter text-only error: {e}") | |
else: | |
try: | |
client.create_tweet(text=post['text']) | |
post['status'] = 'posted' | |
print("Twitter post successful") | |
except tweepy.TweepyException as e: | |
post['status'] = 'failed' | |
print(f"Twitter error: {e}") |