|
import os |
|
import json |
|
import datetime |
|
from typing import List, Dict, Any, Set |
|
from urllib.parse import urlparse, urljoin |
|
import re |
|
import time |
|
|
|
from bs4 import BeautifulSoup |
|
from playwright.sync_api import sync_playwright |
|
import trafilatura |
|
import pymupdf |
|
|
|
class BuffaloScraper: |
|
def __init__(self, seed_url: str = "https://www.buffalo.edu/international-student-services.html", |
|
output_dir: str = "data/raw"): |
|
self.seed_url = seed_url |
|
self.output_dir = output_dir |
|
self.visited_urls: Set[str] = set() |
|
self.queue: List[str] = [seed_url] |
|
self.domain = urlparse(seed_url).netloc |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
self.useless_keywords = [ |
|
"privacy policy", "terms of use", "cookie", "last updated", |
|
"©", "copyright", "follow us", "social media", |
|
"related links", "site map", "skip to content", "all rights reserved" |
|
] |
|
|
|
def is_valid_url(self, url: str) -> bool: |
|
"""Check if URL should be scraped.""" |
|
parsed = urlparse(url) |
|
|
|
|
|
if not parsed.netloc.endswith('buffalo.edu'): |
|
return False |
|
|
|
|
|
if parsed.path.endswith(('.jpg', '.jpeg', '.png', '.gif', '.css', '.js')): |
|
return False |
|
|
|
|
|
if url in self.visited_urls: |
|
return False |
|
|
|
|
|
skip_patterns = [ |
|
'/search', '/login', '/user', '/admin', '/cart', '/account', |
|
'javascript:', 'mailto:', 'tel:', '#', 'facebook.com', 'twitter.com', |
|
'instagram.com', 'youtube.com', 'linkedin.com' |
|
] |
|
if any(pattern in url.lower() for pattern in skip_patterns): |
|
return False |
|
|
|
return True |
|
|
|
def is_useless_line(self, line: str) -> bool: |
|
"""Check if a line of text is likely useless footer/header content.""" |
|
line = line.lower() |
|
return any(kw in line for kw in self.useless_keywords) |
|
|
|
def is_valid_line(self, line: str) -> bool: |
|
"""Check if a line is valid content.""" |
|
if not line or len(line.strip().split()) < 3: |
|
return False |
|
if self.is_useless_line(line): |
|
return False |
|
return True |
|
|
|
def is_heading_like(self, line: str) -> bool: |
|
"""Check if a line is likely a heading.""" |
|
line = line.strip() |
|
word_count = len(line.split()) |
|
return ( |
|
line.isupper() and word_count <= 10 or |
|
(len(line) < 100 and word_count <= 15 and line.endswith((':', '?'))) |
|
) |
|
|
|
def extract_clean_content(self, html: str) -> str: |
|
"""Extract clean content with smart filtering.""" |
|
soup = BeautifulSoup(html, "html.parser") |
|
|
|
|
|
for tag in soup(["script", "style", "header", "footer", "nav", "aside"]): |
|
tag.decompose() |
|
|
|
|
|
trafilatura_content = trafilatura.extract(html, include_tables=True, |
|
include_images=False, |
|
include_links=True, |
|
output_format='txt') |
|
|
|
|
|
if not trafilatura_content or len(trafilatura_content) < 200: |
|
raw_text = soup.get_text(separator="\n") |
|
lines = raw_text.split("\n") |
|
clean_lines = [] |
|
|
|
for line in lines: |
|
line = line.strip() |
|
if not self.is_valid_line(line): |
|
continue |
|
clean_lines.append(line) |
|
|
|
|
|
formatted_text = "" |
|
buffer = "" |
|
|
|
for line in clean_lines: |
|
|
|
if self.is_heading_like(line) or line.startswith(("-", "*", "•")): |
|
if buffer: |
|
formatted_text += buffer.strip() + "\n\n" |
|
buffer = "" |
|
formatted_text += line.strip() + "\n" |
|
else: |
|
buffer += line + " " |
|
|
|
if buffer: |
|
formatted_text += buffer.strip() + "\n" |
|
|
|
return formatted_text.strip() |
|
|
|
return trafilatura_content |
|
|
|
def extract_content(self, html: str, url: str) -> Dict[str, Any]: |
|
"""Extract structured content from HTML.""" |
|
soup = BeautifulSoup(html, 'html.parser') |
|
|
|
|
|
title = soup.title.text.strip() if soup.title else "" |
|
|
|
|
|
content = self.extract_clean_content(html) |
|
|
|
|
|
headings = [] |
|
for h in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']): |
|
headings.append({ |
|
'level': int(h.name[1]), |
|
'text': h.get_text(strip=True) |
|
}) |
|
|
|
|
|
faqs = [] |
|
|
|
faq_containers = soup.select('.accordion, .faq, .collapse, .panel-group, .question-answer, details') |
|
for container in faq_containers: |
|
|
|
question_selectors = ['.accordion-header', '.faq-question', '.card-header', |
|
'summary', '.question', 'dt', 'h3', 'h4', '.panel-title'] |
|
answer_selectors = ['.accordion-body', '.faq-answer', '.card-body', |
|
'.answer', 'dd', '.panel-body', 'p'] |
|
|
|
|
|
questions = container.select(', '.join(question_selectors)) |
|
answers = container.select(', '.join(answer_selectors)) |
|
|
|
|
|
for i, q in enumerate(questions): |
|
if i < len(answers): |
|
faqs.append({ |
|
'question': q.get_text(strip=True), |
|
'answer': answers[i].get_text(strip=True) |
|
}) |
|
|
|
|
|
p_texts = [p.get_text(strip=True) for p in soup.find_all('p')] |
|
for i, text in enumerate(p_texts): |
|
if i < len(p_texts) - 1 and text.strip().endswith('?'): |
|
faqs.append({ |
|
'question': text, |
|
'answer': p_texts[i+1] |
|
}) |
|
|
|
|
|
important_links = [] |
|
for a in soup.find_all('a', href=True): |
|
link_text = a.get_text(strip=True) |
|
href = a['href'] |
|
if link_text and any(keyword in link_text.lower() for keyword in |
|
['form', 'document', 'application', 'guide', 'i-20', 'opt', 'cpt']): |
|
important_links.append({ |
|
'text': link_text, |
|
'url': href |
|
}) |
|
|
|
|
|
parsed = urlparse(url) |
|
path_parts = [p for p in parsed.path.strip("/").split("/") if p] |
|
|
|
|
|
categories = [] |
|
if re.search(r'\b(visa|i-20|i20|sevis|immigration)\b', content, re.I): |
|
categories.append('immigration') |
|
if re.search(r'\b(opt|cpt|employment|work|job|internship)\b', content, re.I): |
|
categories.append('employment') |
|
if re.search(r'\b(tuition|fee|payment|cost|financial)\b', content, re.I): |
|
categories.append('fees') |
|
if re.search(r'\b(housing|accommodation|apartment|dorm|living)\b', content, re.I): |
|
categories.append('housing') |
|
|
|
|
|
document = { |
|
'url': url, |
|
'title': title, |
|
'content': content, |
|
'headings': headings, |
|
'faqs': faqs, |
|
'important_links': important_links, |
|
'categories': categories, |
|
'scraped_at': datetime.datetime.now().isoformat(), |
|
'path_hierarchy': path_parts, |
|
'domain': parsed.netloc |
|
} |
|
|
|
return document |
|
|
|
def extract_links(self, html: str, base_url: str) -> List[str]: |
|
"""Extract all links from the page.""" |
|
soup = BeautifulSoup(html, 'html.parser') |
|
links = [] |
|
|
|
for a in soup.find_all('a', href=True): |
|
href = a['href'] |
|
|
|
full_url = urljoin(base_url, href) |
|
|
|
full_url = full_url.split('#')[0] |
|
full_url = full_url.rstrip('/') |
|
|
|
if self.is_valid_url(full_url): |
|
links.append(full_url) |
|
|
|
return links |
|
|
|
def process_pdf(self, url: str) -> Dict[str, Any]: |
|
"""Download and extract text from PDF.""" |
|
with sync_playwright() as p: |
|
browser = p.chromium.launch() |
|
page = browser.new_page() |
|
|
|
try: |
|
page.goto(url, timeout=60000) |
|
|
|
|
|
pdf_data = page.pdf(path=None) |
|
browser.close() |
|
except Exception as e: |
|
browser.close() |
|
print(f"Error downloading PDF {url}: {str(e)}") |
|
return None |
|
|
|
|
|
temp_path = os.path.join(self.output_dir, "temp.pdf") |
|
with open(temp_path, "wb") as f: |
|
f.write(pdf_data) |
|
|
|
|
|
doc = pymupdf.open(temp_path) |
|
text = "" |
|
for page_num in range(doc.page_count): |
|
page = doc[page_num] |
|
text += page.get_text() |
|
doc.close() |
|
|
|
|
|
os.remove(temp_path) |
|
|
|
|
|
parsed = urlparse(url) |
|
path_parts = [p for p in parsed.path.strip("/").split("/") if p] |
|
filename = os.path.basename(url) |
|
|
|
|
|
categories = [] |
|
if re.search(r'\b(visa|i-20|i20|sevis|immigration)\b', text, re.I): |
|
categories.append('immigration') |
|
if re.search(r'\b(opt|cpt|employment|work|job|internship)\b', text, re.I): |
|
categories.append('employment') |
|
if re.search(r'\b(tuition|fee|payment|cost|financial)\b', text, re.I): |
|
categories.append('fees') |
|
if re.search(r'\b(housing|accommodation|apartment|dorm|living)\b', text, re.I): |
|
categories.append('housing') |
|
|
|
|
|
document = { |
|
'url': url, |
|
'title': filename or os.path.basename(url), |
|
'content': text, |
|
'document_type': 'pdf', |
|
'categories': categories, |
|
'scraped_at': datetime.datetime.now().isoformat(), |
|
'path_hierarchy': path_parts, |
|
'domain': parsed.netloc |
|
} |
|
|
|
return document |
|
|
|
def scrape(self, max_pages: int = 100, max_depth: int = 4) -> None: |
|
"""Main scraping function.""" |
|
pages_scraped = 0 |
|
depth_map = {self.seed_url: 0} |
|
|
|
with sync_playwright() as p: |
|
browser = p.chromium.launch() |
|
page = browser.new_page() |
|
|
|
while self.queue and pages_scraped < max_pages: |
|
url = self.queue.pop(0) |
|
current_depth = depth_map.get(url, 0) |
|
|
|
if current_depth > max_depth: |
|
continue |
|
|
|
if url in self.visited_urls: |
|
continue |
|
|
|
try: |
|
print(f"Scraping: {url} (depth: {current_depth})") |
|
self.visited_urls.add(url) |
|
|
|
|
|
if url.lower().endswith('.pdf'): |
|
document = self.process_pdf(url) |
|
if document: |
|
|
|
filename = f"{pages_scraped:04d}_{urlparse(url).netloc.replace('.', '_')}.json" |
|
filepath = os.path.join(self.output_dir, filename) |
|
with open(filepath, 'w') as f: |
|
json.dump(document, f, indent=2) |
|
|
|
pages_scraped += 1 |
|
else: |
|
|
|
try: |
|
page.goto(url, timeout=30000) |
|
page.wait_for_load_state('networkidle', timeout=10000) |
|
html = page.content() |
|
|
|
|
|
document = self.extract_content(html, url) |
|
|
|
|
|
filename = f"{pages_scraped:04d}_{urlparse(url).netloc.replace('.', '_')}.json" |
|
filepath = os.path.join(self.output_dir, filename) |
|
with open(filepath, 'w') as f: |
|
json.dump(document, f, indent=2) |
|
|
|
pages_scraped += 1 |
|
|
|
|
|
if current_depth < max_depth: |
|
links = self.extract_links(html, url) |
|
for link in links: |
|
if link not in self.visited_urls and link not in self.queue: |
|
self.queue.append(link) |
|
depth_map[link] = current_depth + 1 |
|
except Exception as e: |
|
print(f"Error processing page {url}: {str(e)}") |
|
continue |
|
|
|
except Exception as e: |
|
print(f"Error scraping {url}: {str(e)}") |
|
|
|
|
|
time.sleep(1) |
|
|
|
browser.close() |
|
|
|
print(f"Scraping completed. Scraped {pages_scraped} pages.") |
|
|
|
|
|
if __name__ == "__main__": |
|
scraper = BuffaloScraper() |
|
scraper.scrape(max_pages=100, max_depth=4) |
|
|