WebScr / app.py
JaySMTA's picture
Update app.py
6557cf4 verified
import os
import csv
import urllib.parse
import json
import logging
import asyncio
import aiohttp
import time
import random
from asyncio import Semaphore
from aiohttp import ClientSession
from typing import Dict, Any
import gradio as gr
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Rate limiting configuration
LINKEDIN_RATE_LIMIT = 1
FACEBOOK_RATE_LIMIT = 1
RATE_LIMIT_PERIOD = 1
MAX_TOKENS = 1
class TokenBucket:
def __init__(self, rate: float, max_tokens: int):
self.rate = rate
self.max_tokens = max_tokens
self.tokens = max_tokens
self.last_refill = time.monotonic()
self.lock = asyncio.Lock()
async def get_token(self):
async with self.lock:
now = time.monotonic()
time_passed = now - self.last_refill
self.tokens = min(self.max_tokens, self.tokens + time_passed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return True
else:
return False
async def wait_for_token(self):
while not await self.get_token():
await asyncio.sleep(1 / self.rate)
async def get_secret(secret_name: str) -> str:
return os.environ.get(secret_name)
async def rate_limited_api_call(session: ClientSession, bucket: TokenBucket, url: str, headers: Dict[str, str], params: Dict[str, Any] = None, data: Dict[str, Any] = None, method: str = 'GET') -> Dict[str, Any]:
max_retries = 5
base_delay = 5 # seconds
for attempt in range(max_retries):
await bucket.wait_for_token()
# Add a small random delay before each API call
await asyncio.sleep(random.uniform(0.1, 0.5))
try:
if method == 'GET':
async with session.get(url, headers=headers, params=params) as response:
response.raise_for_status()
return await response.json()
elif method == 'POST':
async with session.post(url, headers=headers, params=params, json=data) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientResponseError as e:
if e.status == 503 and attempt < max_retries - 1:
retry_delay = base_delay * (2 ** attempt) + random.uniform(0, 1) # Exponential backoff with jitter
logging.warning(f"Attempt {attempt + 1} failed with 503 error. Retrying in {retry_delay:.2f} seconds...")
await asyncio.sleep(retry_delay)
else:
logging.error(f"API call failed: {str(e)}")
raise
except Exception as e:
logging.error(f"Unexpected error in API call: {str(e)}")
raise
raise Exception(f"Failed after {max_retries} attempts")
async def scrape_linkedin_profile(session: ClientSession, bucket: TokenBucket, url: str) -> Dict[str, Any]:
api_key = await get_secret('LINKEDIN_API_KEY')
api_endpoint = "https://zylalabs.com/api/1511/linkedin+profile+scraper+api/5771/profile+scraper"
if not api_key:
logging.error("LinkedIn API key not found in environment variables")
return {"Error": "LinkedIn API key not found"}
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
data = {
"link": url
}
try:
result = await rate_limited_api_call(session, bucket, api_endpoint, headers, data=data, method='POST')
if 'data' in result:
profile_data = result['data']
return {
"Name": profile_data.get('fullName', ''),
"Number of Followers": profile_data.get('followers', ''),
"Number of Connections": profile_data.get('connections', ''),
"Age": profile_data.get('age', ''),
"Current Role": profile_data.get('headline', ''),
"Sex": profile_data.get('gender', ''),
"Place of living": profile_data.get('addressWithCountry', '')
}
else:
error_message = f"API error for LinkedIn profile {url}: {result.get('message', 'Unknown error')}"
logging.error(error_message)
return {"Error": error_message}
except Exception as e:
error_message = f"Error scraping LinkedIn profile {url}: {str(e)}"
logging.error(error_message)
return {"Error": error_message}
async def scrape_facebook_profile(session: ClientSession, bucket: TokenBucket, url: str) -> Dict[str, Any]:
api_key = await get_secret('FACEBOOK_API_KEY')
api_endpoint = await get_secret('FACEBOOK_API_ENDPOINT')
if not api_key or not api_endpoint:
logging.error("Facebook API key or endpoint not found in environment variables")
return {"Error": "Facebook API key or endpoint not found in environment variables"}
# Trim any whitespace and encode the API endpoint
api_endpoint = api_endpoint.strip()
api_endpoint = urllib.parse.quote(api_endpoint, safe=':/')
logging.info(f"Facebook API Endpoint: {api_endpoint}")
headers = {
'Authorization': f'Bearer {api_key}'
}
# Extract the profile ID or username from the URL
profile_id = url.split('/')[-1].strip()
# Add the id as a query parameter
params = {'id': profile_id}
try:
result = await rate_limited_api_call(session, bucket, api_endpoint, headers, params=params, method='POST')
if 'error' not in result:
return {
"Full Name": result.get('name', ''),
"Location": result.get('location', ''),
"Phone": result.get('phone', ''),
"Email": result.get('email', ''),
"Follower Count": result.get('follower_count', ''),
"Friend Count": result.get('friend_count', '')
}
else:
error_message = f"API error for Facebook profile {url}: {result.get('error')}"
logging.error(error_message)
return {"Error": error_message}
except aiohttp.ClientResponseError as e:
error_message = f"HTTP error when scraping Facebook profile {url}: {e.status} {e.message}"
logging.error(error_message)
return {"Error": error_message}
except Exception as e:
error_message = f"Error scraping Facebook profile {url}: {str(e)}"
logging.error(error_message)
return {"Error": error_message}
async def process_csv(csv_file, scrape_function):
try:
with open(csv_file.name, 'r') as file:
reader = csv.reader(file)
profile_urls = [row[0] for row in reader if row]
linkedin_bucket = TokenBucket(LINKEDIN_RATE_LIMIT, MAX_TOKENS)
facebook_bucket = TokenBucket(FACEBOOK_RATE_LIMIT, MAX_TOKENS)
async with aiohttp.ClientSession() as session:
tasks = []
for url in profile_urls:
if 'linkedin.com' in url:
tasks.append(scrape_function(session, linkedin_bucket, url))
elif 'facebook.com' in url:
tasks.append(scrape_function(session, facebook_bucket, url))
else:
tasks.append(asyncio.create_task(asyncio.sleep(0))) # Dummy task for unsupported URLs
results = await asyncio.gather(*tasks)
return json.dumps([r for r in results if r is not None], indent=2)
except Exception as e:
logging.error(f"Error processing CSV file: {str(e)}")
return f"Error processing CSV file: {str(e)}"
async def preview_csv(csv_file, scrape_function):
try:
with open(csv_file.name, 'r') as file:
reader = csv.reader(file)
profile_urls = [row[0] for row in reader if row]
if not profile_urls:
return "No URLs found in the CSV file."
preview_url = profile_urls[0]
bucket = TokenBucket(LINKEDIN_RATE_LIMIT if 'linkedin.com' in preview_url else FACEBOOK_RATE_LIMIT, MAX_TOKENS)
async with aiohttp.ClientSession() as session:
preview_data = await scrape_function(session, bucket, preview_url)
if preview_data:
return json.dumps(preview_data, indent=2)
else:
return "Error fetching preview data."
except Exception as e:
logging.error(f"Error previewing CSV file: {str(e)}")
return f"Error previewing CSV file: {str(e)}"
def create_interface(name, scrape_function):
input_file = gr.File(label=f"Upload CSV file with {name} profile URLs")
preview = gr.Textbox(label="Preview (First Profile)", max_lines=10)
output = gr.Textbox(label=f"Scraped {name} Data")
preview_button = gr.Button("Preview")
clear_preview_button = gr.Button("Clear Preview")
scrape_button = gr.Button(f"Scrape {name}")
download = gr.File(label="Download Scraped Data as CSV")
def clear_preview():
return ""
def scrape_and_download(csv_file):
result = asyncio.run(process_csv(csv_file, scrape_function))
if isinstance(result, str) and result.startswith("Error"):
return None
data = json.loads(result)
output_file = f"{scrape_function.__name__}_scraped_data.csv"
def clean_text(text):
return str(text).replace('\n', ' ').strip()
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
for item in data:
if "Error" in item:
writer.writerow(["Error", clean_text(item["Error"])])
else:
for key, value in item.items():
writer.writerow([f"{key}: {clean_text(value)}"])
writer.writerow([]) # Empty row between profiles
return output_file
preview_button.click(lambda x: asyncio.run(preview_csv(x, scrape_function)), inputs=input_file, outputs=preview)
clear_preview_button.click(clear_preview, inputs=None, outputs=preview)
scrape_button.click(lambda x: asyncio.run(process_csv(x, scrape_function)), inputs=input_file, outputs=output)
scrape_button.click(scrape_and_download, inputs=input_file, outputs=download)
return input_file, preview, output, download
def create_app():
with gr.Blocks() as demo:
gr.Markdown("# Multi-functional Data Processing Application")
with gr.Tab("LinkedIn Scraper"):
create_interface("LinkedIn", scrape_linkedin_profile)
with gr.Tab("Facebook Scraper"):
create_interface("Facebook", scrape_facebook_profile)
return demo
# Main application
app = create_app()
if __name__ == "__main__":
logging.info(f"Facebook API Endpoint:{os.environ.get('FACEBOOK_API_ENDPOINT', 'Not set')} ")
if "SPACE_ID" in os.environ:
app.launch()
else:
app.launch(share=True)