Research_Assistant / scraper.py
lara1510's picture
improve scraper
6abb98d verified
raw
history blame
4 kB
import asyncio
import aiohttp
import random
from bs4 import BeautifulSoup
from googlesearch import search
from urllib.parse import urlparse
import os
async def fetch(url, session, retries=3, delay=1):
if retries == 0:
print("Maximum retries exceeded. Failed to fetch URL.")
return
try:
async with session.get(url) as response:
return await response.text(encoding='latin')
except aiohttp.ClientResponseError as e:
if e.status == 429: # HTTP 429: Too Many Requests
print(f"Too many requests. Retrying in {delay} seconds...")
await asyncio.sleep(delay)
return fetch(url, session, retries=retries - 1, delay=delay * 2)
print(f"Error fetching URL: {e}")
async def scrape_websites(topic, num_results_per_link=10):
outputs = await scrape_google(topic, num_results_per_link)
# Select random links based on the user's input
selected_links = random.sample(outputs, min(num_results_per_link, len(outputs)))
# return the list of strings as a single string
# return "\n".join(selected_links)
return list(selected_links)
async def scrape_google(topic, num_results=15) -> list[str]:
# Limit search results to 15 if num_results exceeds 15
num_results = min(num_results, 15)
# Asynchronous HTTP session
async with aiohttp.ClientSession() as session:
# Perform Google search
search_results = search(topic, num=3 * num_results)
search_results = remove_duplicate_results(search_results)
# Shuffle search results order
random.shuffle(search_results)
outlines = []
i = 0
# Keep scraping url's until you collect wanted num_results, or till you have tried all search results
while len(outlines) < num_results and i < len(search_results):
result = await scrape_url(search_results[i], session)
if result:
outlines.append(result)
i += 1
return outlines
async def scrape_url(url, session) -> str:
try:
# Fetch HTML content asynchronously
html_content = await fetch(url, session)
# Parse HTML content with BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# Extract outlines from parsed HTML
outlines = extract_outlines(soup)
# If outlines exist, accumulate them with website info
if outlines:
return get_website_info_str(url, outlines)
return ''
except Exception as e:
print(f"Error '{e}' while processing URL:{url}")
return ''
# Minimum length threshold for relevant outlines
MIN_LENGTH_THRESHOLD = 20
is_irrelevant_outline = lambda outline: len(outline) < MIN_LENGTH_THRESHOLD
extract_main_domain_from_url = lambda url: urlparse(url).netloc.split('www.')[-1].split('/')[0]
extract_website_name_from_url = lambda url: urlparse(url).netloc
extract_title_from_url = lambda url: os.path.basename(urlparse(url).path)
def remove_duplicate_results(search_result) -> list:
unique_domains = set()
unique_websites = []
for url in search_result:
domain = extract_main_domain_from_url(url)
if domain not in unique_domains:
unique_domains.add(domain)
unique_websites.append(url)
return unique_websites
def get_website_info_str(url, outlines) -> str:
website_name = extract_website_name_from_url(url)
info = f"Website: {website_name}\nURL: {url}\nOutlines:\n." + '\n.'.join(outlines)
return info
def filter_outlines(outlines) -> list[str]:
return [outline for outline in set(outlines) if not is_irrelevant_outline(outline)]
def extract_outlines(soup) -> list[str]:
headings = soup.find_all(['h1', 'h2', 'h3', 'h4'])
outlines = [heading.text.strip() for heading in headings]
if len(outlines) >= 3:
return filter_outlines(outlines)