import asyncio import aiohttp import random from bs4 import BeautifulSoup from googlesearch import search from urllib.parse import urlparse import os async def fetch(url, session, retries=3, delay=1): if retries == 0: print("Maximum retries exceeded. Failed to fetch URL.") return try: async with session.get(url) as response: return await response.text(encoding='latin') except aiohttp.ClientResponseError as e: if e.status == 429: # HTTP 429: Too Many Requests print(f"Too many requests. Retrying in {delay} seconds...") await asyncio.sleep(delay) return fetch(url, session, retries=retries - 1, delay=delay * 2) print(f"Error fetching URL: {e}") async def scrape_websites(topic, num_results_per_link=10): outputs = await scrape_google(topic, num_results_per_link) # Select random links based on the user's input selected_links = random.sample(outputs, min(num_results_per_link, len(outputs))) # return the list of strings as a single string # return "\n".join(selected_links) return list(selected_links) async def scrape_google(topic, num_results=15) -> list[str]: # Limit search results to 15 if num_results exceeds 15 num_results = min(num_results, 15) # Asynchronous HTTP session async with aiohttp.ClientSession() as session: # Perform Google search search_results = search(topic, num=3 * num_results) search_results = remove_duplicate_results(search_results) # Shuffle search results order random.shuffle(search_results) outlines = [] i = 0 # Keep scraping url's until you collect wanted num_results, or till you have tried all search results while len(outlines) < num_results and i < len(search_results): result = await scrape_url(search_results[i], session) if result: outlines.append(result) i += 1 return outlines async def scrape_url(url, session) -> str: try: # Fetch HTML content asynchronously html_content = await fetch(url, session) # Parse HTML content with BeautifulSoup soup = BeautifulSoup(html_content, 'html.parser') # Extract outlines from parsed HTML outlines = extract_outlines(soup) # If outlines exist, accumulate them with website info if outlines: return get_website_info_str(url, outlines) return '' except Exception as e: print(f"Error '{e}' while processing URL:{url}") return '' # Minimum length threshold for relevant outlines MIN_LENGTH_THRESHOLD = 20 is_irrelevant_outline = lambda outline: len(outline) < MIN_LENGTH_THRESHOLD extract_main_domain_from_url = lambda url: urlparse(url).netloc.split('www.')[-1].split('/')[0] extract_website_name_from_url = lambda url: urlparse(url).netloc extract_title_from_url = lambda url: os.path.basename(urlparse(url).path) def remove_duplicate_results(search_result) -> list: unique_domains = set() unique_websites = [] for url in search_result: domain = extract_main_domain_from_url(url) if domain not in unique_domains: unique_domains.add(domain) unique_websites.append(url) return unique_websites def get_website_info_str(url, outlines) -> str: website_name = extract_website_name_from_url(url) info = f"Website: {website_name}\nURL: {url}\nOutlines:\n." + '\n.'.join(outlines) return info def filter_outlines(outlines) -> list[str]: return [outline for outline in set(outlines) if not is_irrelevant_outline(outline)] def extract_outlines(soup) -> list[str]: headings = soup.find_all(['h1', 'h2', 'h3', 'h4']) outlines = [heading.text.strip() for heading in headings] if len(outlines) >= 3: return filter_outlines(outlines)