import os import csv import urllib.parse import json import logging import asyncio import aiohttp import time import random from asyncio import Semaphore from aiohttp import ClientSession from typing import Dict, Any import gradio as gr logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Rate limiting configuration LINKEDIN_RATE_LIMIT = 1 FACEBOOK_RATE_LIMIT = 1 RATE_LIMIT_PERIOD = 1 MAX_TOKENS = 1 class TokenBucket: def __init__(self, rate: float, max_tokens: int): self.rate = rate self.max_tokens = max_tokens self.tokens = max_tokens self.last_refill = time.monotonic() self.lock = asyncio.Lock() async def get_token(self): async with self.lock: now = time.monotonic() time_passed = now - self.last_refill self.tokens = min(self.max_tokens, self.tokens + time_passed * self.rate) self.last_refill = now if self.tokens >= 1: self.tokens -= 1 return True else: return False async def wait_for_token(self): while not await self.get_token(): await asyncio.sleep(1 / self.rate) async def get_secret(secret_name: str) -> str: return os.environ.get(secret_name) async def rate_limited_api_call(session: ClientSession, bucket: TokenBucket, url: str, headers: Dict[str, str], params: Dict[str, Any] = None, data: Dict[str, Any] = None, method: str = 'GET') -> Dict[str, Any]: max_retries = 5 base_delay = 5 # seconds for attempt in range(max_retries): await bucket.wait_for_token() # Add a small random delay before each API call await asyncio.sleep(random.uniform(0.1, 0.5)) try: if method == 'GET': async with session.get(url, headers=headers, params=params) as response: response.raise_for_status() return await response.json() elif method == 'POST': async with session.post(url, headers=headers, params=params, json=data) as response: response.raise_for_status() return await response.json() except aiohttp.ClientResponseError as e: if e.status == 503 and attempt < max_retries - 1: retry_delay = base_delay * (2 ** attempt) + random.uniform(0, 1) # Exponential backoff with jitter logging.warning(f"Attempt {attempt + 1} failed with 503 error. Retrying in {retry_delay:.2f} seconds...") await asyncio.sleep(retry_delay) else: logging.error(f"API call failed: {str(e)}") raise except Exception as e: logging.error(f"Unexpected error in API call: {str(e)}") raise raise Exception(f"Failed after {max_retries} attempts") async def scrape_linkedin_profile(session: ClientSession, bucket: TokenBucket, url: str) -> Dict[str, Any]: api_key = await get_secret('LINKEDIN_API_KEY') api_endpoint = "https://zylalabs.com/api/1511/linkedin+profile+scraper+api/5771/profile+scraper" if not api_key: logging.error("LinkedIn API key not found in environment variables") return {"Error": "LinkedIn API key not found"} headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json' } data = { "link": url } try: result = await rate_limited_api_call(session, bucket, api_endpoint, headers, data=data, method='POST') if 'data' in result: profile_data = result['data'] return { "Name": profile_data.get('fullName', ''), "Number of Followers": profile_data.get('followers', ''), "Number of Connections": profile_data.get('connections', ''), "Age": profile_data.get('age', ''), "Current Role": profile_data.get('headline', ''), "Sex": profile_data.get('gender', ''), "Place of living": profile_data.get('addressWithCountry', '') } else: error_message = f"API error for LinkedIn profile {url}: {result.get('message', 'Unknown error')}" logging.error(error_message) return {"Error": error_message} except Exception as e: error_message = f"Error scraping LinkedIn profile {url}: {str(e)}" logging.error(error_message) return {"Error": error_message} async def scrape_facebook_profile(session: ClientSession, bucket: TokenBucket, url: str) -> Dict[str, Any]: api_key = await get_secret('FACEBOOK_API_KEY') api_endpoint = await get_secret('FACEBOOK_API_ENDPOINT') if not api_key or not api_endpoint: logging.error("Facebook API key or endpoint not found in environment variables") return {"Error": "Facebook API key or endpoint not found in environment variables"} # Trim any whitespace and encode the API endpoint api_endpoint = api_endpoint.strip() api_endpoint = urllib.parse.quote(api_endpoint, safe=':/') logging.info(f"Facebook API Endpoint: {api_endpoint}") headers = { 'Authorization': f'Bearer {api_key}' } # Extract the profile ID or username from the URL profile_id = url.split('/')[-1].strip() # Add the id as a query parameter params = {'id': profile_id} try: result = await rate_limited_api_call(session, bucket, api_endpoint, headers, params=params, method='POST') if 'error' not in result: return { "Full Name": result.get('name', ''), "Location": result.get('location', ''), "Phone": result.get('phone', ''), "Email": result.get('email', ''), "Follower Count": result.get('follower_count', ''), "Friend Count": result.get('friend_count', '') } else: error_message = f"API error for Facebook profile {url}: {result.get('error')}" logging.error(error_message) return {"Error": error_message} except aiohttp.ClientResponseError as e: error_message = f"HTTP error when scraping Facebook profile {url}: {e.status} {e.message}" logging.error(error_message) return {"Error": error_message} except Exception as e: error_message = f"Error scraping Facebook profile {url}: {str(e)}" logging.error(error_message) return {"Error": error_message} async def process_csv(csv_file, scrape_function): try: with open(csv_file.name, 'r') as file: reader = csv.reader(file) profile_urls = [row[0] for row in reader if row] linkedin_bucket = TokenBucket(LINKEDIN_RATE_LIMIT, MAX_TOKENS) facebook_bucket = TokenBucket(FACEBOOK_RATE_LIMIT, MAX_TOKENS) async with aiohttp.ClientSession() as session: tasks = [] for url in profile_urls: if 'linkedin.com' in url: tasks.append(scrape_function(session, linkedin_bucket, url)) elif 'facebook.com' in url: tasks.append(scrape_function(session, facebook_bucket, url)) else: tasks.append(asyncio.create_task(asyncio.sleep(0))) # Dummy task for unsupported URLs results = await asyncio.gather(*tasks) return json.dumps([r for r in results if r is not None], indent=2) except Exception as e: logging.error(f"Error processing CSV file: {str(e)}") return f"Error processing CSV file: {str(e)}" async def preview_csv(csv_file, scrape_function): try: with open(csv_file.name, 'r') as file: reader = csv.reader(file) profile_urls = [row[0] for row in reader if row] if not profile_urls: return "No URLs found in the CSV file." preview_url = profile_urls[0] bucket = TokenBucket(LINKEDIN_RATE_LIMIT if 'linkedin.com' in preview_url else FACEBOOK_RATE_LIMIT, MAX_TOKENS) async with aiohttp.ClientSession() as session: preview_data = await scrape_function(session, bucket, preview_url) if preview_data: return json.dumps(preview_data, indent=2) else: return "Error fetching preview data." except Exception as e: logging.error(f"Error previewing CSV file: {str(e)}") return f"Error previewing CSV file: {str(e)}" def create_interface(name, scrape_function): input_file = gr.File(label=f"Upload CSV file with {name} profile URLs") preview = gr.Textbox(label="Preview (First Profile)", max_lines=10) output = gr.Textbox(label=f"Scraped {name} Data") preview_button = gr.Button("Preview") clear_preview_button = gr.Button("Clear Preview") scrape_button = gr.Button(f"Scrape {name}") download = gr.File(label="Download Scraped Data as CSV") def clear_preview(): return "" def scrape_and_download(csv_file): result = asyncio.run(process_csv(csv_file, scrape_function)) if isinstance(result, str) and result.startswith("Error"): return None data = json.loads(result) output_file = f"{scrape_function.__name__}_scraped_data.csv" def clean_text(text): return str(text).replace('\n', ' ').strip() with open(output_file, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) for item in data: if "Error" in item: writer.writerow(["Error", clean_text(item["Error"])]) else: for key, value in item.items(): writer.writerow([f"{key}: {clean_text(value)}"]) writer.writerow([]) # Empty row between profiles return output_file preview_button.click(lambda x: asyncio.run(preview_csv(x, scrape_function)), inputs=input_file, outputs=preview) clear_preview_button.click(clear_preview, inputs=None, outputs=preview) scrape_button.click(lambda x: asyncio.run(process_csv(x, scrape_function)), inputs=input_file, outputs=output) scrape_button.click(scrape_and_download, inputs=input_file, outputs=download) return input_file, preview, output, download def create_app(): with gr.Blocks() as demo: gr.Markdown("# Multi-functional Data Processing Application") with gr.Tab("LinkedIn Scraper"): create_interface("LinkedIn", scrape_linkedin_profile) with gr.Tab("Facebook Scraper"): create_interface("Facebook", scrape_facebook_profile) return demo # Main application app = create_app() if __name__ == "__main__": logging.info(f"Facebook API Endpoint:{os.environ.get('FACEBOOK_API_ENDPOINT', 'Not set')} ") if "SPACE_ID" in os.environ: app.launch() else: app.launch(share=True)