|
import os |
|
import csv |
|
import urllib.parse |
|
import json |
|
import logging |
|
import asyncio |
|
import aiohttp |
|
import time |
|
import random |
|
from asyncio import Semaphore |
|
from aiohttp import ClientSession |
|
from typing import Dict, Any |
|
import gradio as gr |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
LINKEDIN_RATE_LIMIT = 1 |
|
FACEBOOK_RATE_LIMIT = 1 |
|
RATE_LIMIT_PERIOD = 1 |
|
MAX_TOKENS = 1 |
|
|
|
class TokenBucket: |
|
def __init__(self, rate: float, max_tokens: int): |
|
self.rate = rate |
|
self.max_tokens = max_tokens |
|
self.tokens = max_tokens |
|
self.last_refill = time.monotonic() |
|
self.lock = asyncio.Lock() |
|
|
|
async def get_token(self): |
|
async with self.lock: |
|
now = time.monotonic() |
|
time_passed = now - self.last_refill |
|
self.tokens = min(self.max_tokens, self.tokens + time_passed * self.rate) |
|
self.last_refill = now |
|
|
|
if self.tokens >= 1: |
|
self.tokens -= 1 |
|
return True |
|
else: |
|
return False |
|
|
|
async def wait_for_token(self): |
|
while not await self.get_token(): |
|
await asyncio.sleep(1 / self.rate) |
|
|
|
async def get_secret(secret_name: str) -> str: |
|
return os.environ.get(secret_name) |
|
|
|
async def rate_limited_api_call(session: ClientSession, bucket: TokenBucket, url: str, headers: Dict[str, str], params: Dict[str, Any] = None, data: Dict[str, Any] = None, method: str = 'GET') -> Dict[str, Any]: |
|
max_retries = 5 |
|
base_delay = 5 |
|
|
|
for attempt in range(max_retries): |
|
await bucket.wait_for_token() |
|
|
|
|
|
await asyncio.sleep(random.uniform(0.1, 0.5)) |
|
|
|
try: |
|
if method == 'GET': |
|
async with session.get(url, headers=headers, params=params) as response: |
|
response.raise_for_status() |
|
return await response.json() |
|
elif method == 'POST': |
|
async with session.post(url, headers=headers, params=params, json=data) as response: |
|
response.raise_for_status() |
|
return await response.json() |
|
except aiohttp.ClientResponseError as e: |
|
if e.status == 503 and attempt < max_retries - 1: |
|
retry_delay = base_delay * (2 ** attempt) + random.uniform(0, 1) |
|
logging.warning(f"Attempt {attempt + 1} failed with 503 error. Retrying in {retry_delay:.2f} seconds...") |
|
await asyncio.sleep(retry_delay) |
|
else: |
|
logging.error(f"API call failed: {str(e)}") |
|
raise |
|
except Exception as e: |
|
logging.error(f"Unexpected error in API call: {str(e)}") |
|
raise |
|
|
|
raise Exception(f"Failed after {max_retries} attempts") |
|
|
|
async def scrape_linkedin_profile(session: ClientSession, bucket: TokenBucket, url: str) -> Dict[str, Any]: |
|
api_key = await get_secret('LINKEDIN_API_KEY') |
|
api_endpoint = "https://zylalabs.com/api/1511/linkedin+profile+scraper+api/5771/profile+scraper" |
|
|
|
if not api_key: |
|
logging.error("LinkedIn API key not found in environment variables") |
|
return {"Error": "LinkedIn API key not found"} |
|
|
|
headers = { |
|
'Authorization': f'Bearer {api_key}', |
|
'Content-Type': 'application/json' |
|
} |
|
|
|
data = { |
|
"link": url |
|
} |
|
|
|
try: |
|
result = await rate_limited_api_call(session, bucket, api_endpoint, headers, data=data, method='POST') |
|
|
|
if 'data' in result: |
|
profile_data = result['data'] |
|
return { |
|
"Name": profile_data.get('fullName', ''), |
|
"Number of Followers": profile_data.get('followers', ''), |
|
"Number of Connections": profile_data.get('connections', ''), |
|
"Age": profile_data.get('age', ''), |
|
"Current Role": profile_data.get('headline', ''), |
|
"Sex": profile_data.get('gender', ''), |
|
"Place of living": profile_data.get('addressWithCountry', '') |
|
} |
|
else: |
|
error_message = f"API error for LinkedIn profile {url}: {result.get('message', 'Unknown error')}" |
|
logging.error(error_message) |
|
return {"Error": error_message} |
|
except Exception as e: |
|
error_message = f"Error scraping LinkedIn profile {url}: {str(e)}" |
|
logging.error(error_message) |
|
return {"Error": error_message} |
|
|
|
async def scrape_facebook_profile(session: ClientSession, bucket: TokenBucket, url: str) -> Dict[str, Any]: |
|
api_key = await get_secret('FACEBOOK_API_KEY') |
|
api_endpoint = await get_secret('FACEBOOK_API_ENDPOINT') |
|
if not api_key or not api_endpoint: |
|
logging.error("Facebook API key or endpoint not found in environment variables") |
|
return {"Error": "Facebook API key or endpoint not found in environment variables"} |
|
|
|
|
|
api_endpoint = api_endpoint.strip() |
|
api_endpoint = urllib.parse.quote(api_endpoint, safe=':/') |
|
|
|
logging.info(f"Facebook API Endpoint: {api_endpoint}") |
|
|
|
headers = { |
|
'Authorization': f'Bearer {api_key}' |
|
} |
|
|
|
|
|
profile_id = url.split('/')[-1].strip() |
|
|
|
|
|
params = {'id': profile_id} |
|
|
|
try: |
|
result = await rate_limited_api_call(session, bucket, api_endpoint, headers, params=params, method='POST') |
|
|
|
if 'error' not in result: |
|
return { |
|
"Full Name": result.get('name', ''), |
|
"Location": result.get('location', ''), |
|
"Phone": result.get('phone', ''), |
|
"Email": result.get('email', ''), |
|
"Follower Count": result.get('follower_count', ''), |
|
"Friend Count": result.get('friend_count', '') |
|
} |
|
else: |
|
error_message = f"API error for Facebook profile {url}: {result.get('error')}" |
|
logging.error(error_message) |
|
return {"Error": error_message} |
|
except aiohttp.ClientResponseError as e: |
|
error_message = f"HTTP error when scraping Facebook profile {url}: {e.status} {e.message}" |
|
logging.error(error_message) |
|
return {"Error": error_message} |
|
except Exception as e: |
|
error_message = f"Error scraping Facebook profile {url}: {str(e)}" |
|
logging.error(error_message) |
|
return {"Error": error_message} |
|
|
|
async def process_csv(csv_file, scrape_function): |
|
try: |
|
with open(csv_file.name, 'r') as file: |
|
reader = csv.reader(file) |
|
profile_urls = [row[0] for row in reader if row] |
|
|
|
linkedin_bucket = TokenBucket(LINKEDIN_RATE_LIMIT, MAX_TOKENS) |
|
facebook_bucket = TokenBucket(FACEBOOK_RATE_LIMIT, MAX_TOKENS) |
|
|
|
async with aiohttp.ClientSession() as session: |
|
tasks = [] |
|
for url in profile_urls: |
|
if 'linkedin.com' in url: |
|
tasks.append(scrape_function(session, linkedin_bucket, url)) |
|
elif 'facebook.com' in url: |
|
tasks.append(scrape_function(session, facebook_bucket, url)) |
|
else: |
|
tasks.append(asyncio.create_task(asyncio.sleep(0))) |
|
|
|
results = await asyncio.gather(*tasks) |
|
|
|
return json.dumps([r for r in results if r is not None], indent=2) |
|
except Exception as e: |
|
logging.error(f"Error processing CSV file: {str(e)}") |
|
return f"Error processing CSV file: {str(e)}" |
|
|
|
async def preview_csv(csv_file, scrape_function): |
|
try: |
|
with open(csv_file.name, 'r') as file: |
|
reader = csv.reader(file) |
|
profile_urls = [row[0] for row in reader if row] |
|
|
|
if not profile_urls: |
|
return "No URLs found in the CSV file." |
|
|
|
preview_url = profile_urls[0] |
|
bucket = TokenBucket(LINKEDIN_RATE_LIMIT if 'linkedin.com' in preview_url else FACEBOOK_RATE_LIMIT, MAX_TOKENS) |
|
|
|
async with aiohttp.ClientSession() as session: |
|
preview_data = await scrape_function(session, bucket, preview_url) |
|
|
|
if preview_data: |
|
return json.dumps(preview_data, indent=2) |
|
else: |
|
return "Error fetching preview data." |
|
except Exception as e: |
|
logging.error(f"Error previewing CSV file: {str(e)}") |
|
return f"Error previewing CSV file: {str(e)}" |
|
|
|
def create_interface(name, scrape_function): |
|
input_file = gr.File(label=f"Upload CSV file with {name} profile URLs") |
|
preview = gr.Textbox(label="Preview (First Profile)", max_lines=10) |
|
output = gr.Textbox(label=f"Scraped {name} Data") |
|
preview_button = gr.Button("Preview") |
|
clear_preview_button = gr.Button("Clear Preview") |
|
scrape_button = gr.Button(f"Scrape {name}") |
|
download = gr.File(label="Download Scraped Data as CSV") |
|
|
|
def clear_preview(): |
|
return "" |
|
|
|
def scrape_and_download(csv_file): |
|
result = asyncio.run(process_csv(csv_file, scrape_function)) |
|
if isinstance(result, str) and result.startswith("Error"): |
|
return None |
|
data = json.loads(result) |
|
output_file = f"{scrape_function.__name__}_scraped_data.csv" |
|
|
|
def clean_text(text): |
|
return str(text).replace('\n', ' ').strip() |
|
|
|
with open(output_file, 'w', newline='', encoding='utf-8') as f: |
|
writer = csv.writer(f) |
|
for item in data: |
|
if "Error" in item: |
|
writer.writerow(["Error", clean_text(item["Error"])]) |
|
else: |
|
for key, value in item.items(): |
|
writer.writerow([f"{key}: {clean_text(value)}"]) |
|
writer.writerow([]) |
|
|
|
return output_file |
|
|
|
preview_button.click(lambda x: asyncio.run(preview_csv(x, scrape_function)), inputs=input_file, outputs=preview) |
|
clear_preview_button.click(clear_preview, inputs=None, outputs=preview) |
|
scrape_button.click(lambda x: asyncio.run(process_csv(x, scrape_function)), inputs=input_file, outputs=output) |
|
scrape_button.click(scrape_and_download, inputs=input_file, outputs=download) |
|
|
|
return input_file, preview, output, download |
|
|
|
def create_app(): |
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Multi-functional Data Processing Application") |
|
|
|
with gr.Tab("LinkedIn Scraper"): |
|
create_interface("LinkedIn", scrape_linkedin_profile) |
|
|
|
with gr.Tab("Facebook Scraper"): |
|
create_interface("Facebook", scrape_facebook_profile) |
|
|
|
return demo |
|
|
|
|
|
app = create_app() |
|
|
|
if __name__ == "__main__": |
|
logging.info(f"Facebook API Endpoint:{os.environ.get('FACEBOOK_API_ENDPOINT', 'Not set')} ") |
|
if "SPACE_ID" in os.environ: |
|
|
|
app.launch() |
|
else: |
|
|
|
app.launch(share=True) |