RssCraigC / helpers.py
AiDeveloper1's picture
Upload 5 files
14c24bc verified
import re
from io import BytesIO
import requests
import tweepy
import os
from dotenv import load_dotenv
load_dotenv()
LINKEDIN_CLIENT_ID = os.getenv("LINKEDIN_CLIENT_ID")
LINKEDIN_CLIENT_SECRET = os.getenv("LINKEDIN_CLIENT_SECRET")
TWITTER_CLIENT_ID = os.getenv("TWITTER_CLIENT_ID")
TWITTER_CLIENT_SECRET = os.getenv("TWITTER_CLIENT_SECRET")
def extract_image_url(entry):
"""Extract an image URL from an RSS feed entry."""
for enclosure in entry.get('enclosures', []):
if enclosure.get('type', '').startswith('image/'):
return enclosure.get('url')
for media in entry.get('media_content', []):
if media.get('type', '').startswith('image/'):
return media.get('url')
for thumbnail in entry.get('media_thumbnail', []):
if thumbnail.get('url'):
return thumbnail.get('url')
if 'image' in entry:
image = entry['image']
if isinstance(image, dict) and 'url' in image:
return image['url']
elif isinstance(image, list):
for img in image:
if 'url' in img:
return img['url']
if 'itunes_image' in entry:
return entry['itunes_image'].get('href')
for field in ['description', 'summary', 'content']:
if field in entry:
content = entry[field]
if isinstance(content, list):
content = content[0].get('value', '')
elif isinstance(content, dict):
content = content.get('value', '')
else:
content = str(content)
match = re.search(r'<img[^>]+src=["\'](.*?)["\']', content, re.I)
if match:
return match.group(1)
return None
def post_to_linkedin(post):
"""Post content to LinkedIn with optional image."""
if post['status'] not in ['pending', 'posting']:
return
access_token = post['access_token']
print("linkedin_access_token",access_token)
linkedin_id = post['linkedin_id']
image_url = post.get('image_url')
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json',
}
if image_url:
response = requests.get(image_url, timeout=10)
if response.status_code == 200:
image_content = response.content
register_url = 'https://api.linkedin.com/v2/assets?action=registerUpload'
register_body = {
'registerUploadRequest': {
'recipes': ['urn:li:digitalmediaRecipe:feedshare-image'],
'owner': f'urn:li:person:{linkedin_id}',
'serviceRelationships': [
{'relationshipType': 'OWNER', 'identifier': 'urn:li:userGeneratedContent'}
]
}
}
register_response = requests.post(register_url, headers=headers, json=register_body)
if register_response.status_code == 200:
upload_data = register_response.json()['value']
upload_url = upload_data['uploadMechanism']['com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest']['uploadUrl']
asset = upload_data['asset']
upload_headers = {'Authorization': f'Bearer {access_token}'}
upload_response = requests.put(upload_url, headers=upload_headers, data=image_content)
if upload_response.status_code == 201:
api_url = 'https://api.linkedin.com/v2/ugcPosts'
post_body = {
'author': f'urn:li:person:{linkedin_id}',
'lifecycleState': 'PUBLISHED',
'specificContent': {
'com.linkedin.ugc.ShareContent': {
'shareCommentary': {'text': post['text']},
'shareMediaCategory': 'IMAGE',
'media': [{'status': 'READY', 'media': asset}]
}
},
'visibility': {'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC'}
}
response = requests.post(api_url, headers=headers, json=post_body)
post['status'] = 'posted' if response.status_code == 201 else 'failed'
print(f"LinkedIn post attempt: {response.status_code} - {response.text}")
else:
print(f"Image upload failed: {upload_response.status_code}")
else:
print(f"Upload registration failed: {register_response.status_code}")
else:
print(f"Image download failed: {response.status_code}")
if post['status'] != 'posted':
api_url = 'https://api.linkedin.com/v2/ugcPosts'
post_body = {
'author': f'urn:li:person:{linkedin_id}',
'lifecycleState': 'PUBLISHED',
'specificContent': {
'com.linkedin.ugc.ShareContent': {
'shareCommentary': {'text': post['text']},
'shareMediaCategory': 'NONE'
}
},
'visibility': {'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC'}
}
response = requests.post(api_url, headers=headers, json=post_body)
post['status'] = 'posted' if response.status_code == 201 else 'failed'
print(f"LinkedIn text-only post: {response.status_code} - {response.text}")
else:
api_url = 'https://api.linkedin.com/v2/ugcPosts'
post_body = {
'author': f'urn:li:person:{linkedin_id}',
'lifecycleState': 'PUBLISHED',
'specificContent': {
'com.linkedin.ugc.ShareContent': {
'shareCommentary': {'text': post['text']},
'shareMediaCategory': 'NONE'
}
},
'visibility': {'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC'}
}
response = requests.post(api_url, headers=headers, json=post_body)
post['status'] = 'posted' if response.status_code == 201 else 'failed'
print(f"LinkedIn post attempt: {response.status_code} - {response.text}")
def post_to_twitter(post):
"""Post content to Twitter with optional image."""
if post['status'] not in ['pending', 'posting']:
return
client = tweepy.Client(
consumer_key=TWITTER_CLIENT_ID,
consumer_secret=TWITTER_CLIENT_SECRET,
access_token=post['access_token'],
access_token_secret=post['access_token_secret']
)
print("access_token_secret",client.access_token_secret)
image_url = post.get('image_url')
if image_url:
response = requests.get(image_url, timeout=10)
if response.status_code == 200:
image_content = BytesIO(response.content)
try:
api = tweepy.API(tweepy.OAuth1UserHandler(
TWITTER_CLIENT_ID, TWITTER_CLIENT_SECRET,
post['access_token'], post['access_token_secret']
))
media = api.media_upload(filename='image', file=image_content)
client.create_tweet(text=post['text'], media_ids=[media.media_id])
post['status'] = 'posted'
print("Twitter post with image successful")
except tweepy.TweepyException as e:
print(f"Twitter image post error: {e}")
try:
client.create_tweet(text=post['text'])
post['status'] = 'posted'
print("Twitter text-only post successful")
except tweepy.TweepyException as e:
post['status'] = 'failed'
print(f"Twitter text-only error: {e}")
except Exception as e:
print(f"Media upload error: {e}")
else:
print(f"Image download failed: {response.status_code}")
try:
client.create_tweet(text=post['text'])
post['status'] = 'posted'
print("Twitter text-only post successful")
except tweepy.TweepyException as e:
post['status'] = 'failed'
print(f"Twitter text-only error: {e}")
else:
try:
client.create_tweet(text=post['text'])
post['status'] = 'posted'
print("Twitter post successful")
except tweepy.TweepyException as e:
post['status'] = 'failed'
print(f"Twitter error: {e}")