Spaces:
Sleeping
Sleeping
File size: 3,940 Bytes
0bf16bf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
import asyncio
import aiohttp
import random
from bs4 import BeautifulSoup
from googlesearch import search
from urllib.parse import urlparse
async def fetch(url, session, retries=3):
for attempt in range(retries):
try:
async with session.get(url) as response:
return await response.text()
except aiohttp.ClientResponseError as e:
if e.status == 429: # HTTP 429: Too Many Requests
delay = 2 ** attempt # Exponential backoff
print(f"Too many requests. Retrying in {delay} seconds...")
await asyncio.sleep(delay)
else:
print(f"Error fetching URL: {e}")
return None
print("Maximum retries exceeded. Failed to fetch URL.")
return None
async def scrape_google(topic, num_results=20) -> str:
# Initialize a set to store unique website URLs
unique_websites = set()
# Initialize an empty string to accumulate outlines
outlines_str = ""
# Asynchronous HTTP session
async with aiohttp.ClientSession() as session:
# Perform Google search
search_results = search(topic, num=num_results, stop=num_results)
# Limit search results to 15 if num_results exceeds 15
search_results = random.sample(list(search_results), min(num_results, 15))
# Iterate through search results
for url in search_results:
# Exit loop if desired number of websites are found
if len(unique_websites) >= num_results:
break
# Scrape outlines from URL and accumulate them
outlines_str += await scrape_url(url, session, unique_websites)
# Return accumulated outlines string
return outlines_str
async def scrape_url(url, session, unique_websites) -> str:
try:
# Extract main domain from the URL
domain = urlparse(url).netloc.split('www.')[-1].split('/')[0]
# Check if the domain has already been processed
if domain in unique_websites:
return ""
# Fetch HTML content asynchronously
html_content = await fetch(url, session)
# Parse HTML content with BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# Extract outlines from parsed HTML
outlines = extract_outlines(soup)
# Skip URLs with less than three outlines
if len(outlines) < 3:
return ""
# Filter irrelevant outlines
outlines = filter_outlines(outlines)
outlines_str = ""
# If outlines exist, accumulate them with website info
if outlines:
website_name = urlparse(url).netloc # Extract website name from URL
outlines_str += f"Website: {website_name}\n"
outlines_str += f"URL: {url}\n"
outlines_str += "Outlines:\n"
for outline in outlines:
outlines_str += ". "+ outline + "\n"
outlines_str += "----------------------------------------------------------------------------------------------\n"
# Add the domain to the set of unique websites
unique_websites.add(domain)
return outlines_str
except Exception as e:
# Handle exceptions and return empty string
print("Error processing URL:", url)
print(e)
return ""
def is_irrelevant_outline(outline):
# Minimum length threshold for relevant outlines
min_length_threshold = 20
return len(outline) < min_length_threshold
def filter_outlines(outlines):
filtered_outlines = [outline for outline in outlines if not is_irrelevant_outline(outline)]
filtered_outlines = list(set(filtered_outlines))
return filtered_outlines
def extract_outlines(soup):
outlines = []
headings = soup.find_all(['h1', 'h2', 'h3', 'h4'])
for heading in headings:
outlines.append(heading.text.strip())
return outlines
|