Spaces:
Sleeping
Sleeping
improve scraper
Browse files- scraper.py +107 -0
scraper.py
ADDED
@@ -0,0 +1,107 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import asyncio
|
2 |
+
import aiohttp
|
3 |
+
import random
|
4 |
+
from bs4 import BeautifulSoup
|
5 |
+
from googlesearch import search
|
6 |
+
from urllib.parse import urlparse
|
7 |
+
import os
|
8 |
+
|
9 |
+
|
10 |
+
async def fetch(url, session, retries=3, delay=1):
|
11 |
+
if retries == 0:
|
12 |
+
print("Maximum retries exceeded. Failed to fetch URL.")
|
13 |
+
return
|
14 |
+
try:
|
15 |
+
async with session.get(url) as response:
|
16 |
+
return await response.text(encoding='latin')
|
17 |
+
except aiohttp.ClientResponseError as e:
|
18 |
+
if e.status == 429: # HTTP 429: Too Many Requests
|
19 |
+
print(f"Too many requests. Retrying in {delay} seconds...")
|
20 |
+
await asyncio.sleep(delay)
|
21 |
+
return fetch(url, session, retries=retries - 1, delay=delay * 2)
|
22 |
+
print(f"Error fetching URL: {e}")
|
23 |
+
|
24 |
+
|
25 |
+
async def scrape_websites(topic, num_results_per_link=10):
|
26 |
+
outputs = await scrape_google(topic, num_results_per_link)
|
27 |
+
# Select random links based on the user's input
|
28 |
+
selected_links = random.sample(outputs, min(num_results_per_link, len(outputs)))
|
29 |
+
# return the list of strings as a single string
|
30 |
+
# return "\n".join(selected_links)
|
31 |
+
return list(selected_links)
|
32 |
+
|
33 |
+
|
34 |
+
async def scrape_google(topic, num_results=15) -> list[str]:
|
35 |
+
# Limit search results to 15 if num_results exceeds 15
|
36 |
+
num_results = min(num_results, 15)
|
37 |
+
# Asynchronous HTTP session
|
38 |
+
async with aiohttp.ClientSession() as session:
|
39 |
+
# Perform Google search
|
40 |
+
search_results = search(topic, num=3 * num_results)
|
41 |
+
search_results = remove_duplicate_results(search_results)
|
42 |
+
# Shuffle search results order
|
43 |
+
random.shuffle(search_results)
|
44 |
+
outlines = []
|
45 |
+
i = 0
|
46 |
+
# Keep scraping url's until you collect wanted num_results, or till you have tried all search results
|
47 |
+
while len(outlines) < num_results and i < len(search_results):
|
48 |
+
result = await scrape_url(search_results[i], session)
|
49 |
+
if result:
|
50 |
+
outlines.append(result)
|
51 |
+
i += 1
|
52 |
+
return outlines
|
53 |
+
|
54 |
+
|
55 |
+
async def scrape_url(url, session) -> str:
|
56 |
+
try:
|
57 |
+
# Fetch HTML content asynchronously
|
58 |
+
html_content = await fetch(url, session)
|
59 |
+
# Parse HTML content with BeautifulSoup
|
60 |
+
soup = BeautifulSoup(html_content, 'html.parser')
|
61 |
+
# Extract outlines from parsed HTML
|
62 |
+
outlines = extract_outlines(soup)
|
63 |
+
|
64 |
+
# If outlines exist, accumulate them with website info
|
65 |
+
if outlines:
|
66 |
+
return get_website_info_str(url, outlines)
|
67 |
+
return ''
|
68 |
+
|
69 |
+
except Exception as e:
|
70 |
+
print(f"Error '{e}' while processing URL:{url}")
|
71 |
+
return ''
|
72 |
+
|
73 |
+
|
74 |
+
# Minimum length threshold for relevant outlines
|
75 |
+
MIN_LENGTH_THRESHOLD = 20
|
76 |
+
is_irrelevant_outline = lambda outline: len(outline) < MIN_LENGTH_THRESHOLD
|
77 |
+
extract_main_domain_from_url = lambda url: urlparse(url).netloc.split('www.')[-1].split('/')[0]
|
78 |
+
extract_website_name_from_url = lambda url: urlparse(url).netloc
|
79 |
+
extract_title_from_url = lambda url: os.path.basename(urlparse(url).path)
|
80 |
+
|
81 |
+
|
82 |
+
def remove_duplicate_results(search_result) -> list:
|
83 |
+
unique_domains = set()
|
84 |
+
unique_websites = []
|
85 |
+
for url in search_result:
|
86 |
+
domain = extract_main_domain_from_url(url)
|
87 |
+
if domain not in unique_domains:
|
88 |
+
unique_domains.add(domain)
|
89 |
+
unique_websites.append(url)
|
90 |
+
return unique_websites
|
91 |
+
|
92 |
+
|
93 |
+
def get_website_info_str(url, outlines) -> str:
|
94 |
+
website_name = extract_website_name_from_url(url)
|
95 |
+
info = f"Website: {website_name}\nURL: {url}\nOutlines:\n." + '\n.'.join(outlines)
|
96 |
+
return info
|
97 |
+
|
98 |
+
|
99 |
+
def filter_outlines(outlines) -> list[str]:
|
100 |
+
return [outline for outline in set(outlines) if not is_irrelevant_outline(outline)]
|
101 |
+
|
102 |
+
|
103 |
+
def extract_outlines(soup) -> list[str]:
|
104 |
+
headings = soup.find_all(['h1', 'h2', 'h3', 'h4'])
|
105 |
+
outlines = [heading.text.strip() for heading in headings]
|
106 |
+
if len(outlines) >= 3:
|
107 |
+
return filter_outlines(outlines)
|