import os import json import datetime from typing import List, Dict, Any, Set from urllib.parse import urlparse, urljoin import re import time from bs4 import BeautifulSoup from playwright.sync_api import sync_playwright import trafilatura import pymupdf class BuffaloScraper: def __init__(self, seed_url: str = "https://www.buffalo.edu/international-student-services.html", output_dir: str = "data/raw"): self.seed_url = seed_url self.output_dir = output_dir self.visited_urls: Set[str] = set() self.queue: List[str] = [seed_url] self.domain = urlparse(seed_url).netloc # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Keywords to filter useless content self.useless_keywords = [ "privacy policy", "terms of use", "cookie", "last updated", "©", "copyright", "follow us", "social media", "related links", "site map", "skip to content", "all rights reserved" ] def is_valid_url(self, url: str) -> bool: """Check if URL should be scraped.""" parsed = urlparse(url) # Only process buffalo.edu URLs if not parsed.netloc.endswith('buffalo.edu'): return False # Skip certain file types if parsed.path.endswith(('.jpg', '.jpeg', '.png', '.gif', '.css', '.js')): return False # Skip already visited URLs if url in self.visited_urls: return False # Skip certain patterns that are likely not content pages skip_patterns = [ '/search', '/login', '/user', '/admin', '/cart', '/account', 'javascript:', 'mailto:', 'tel:', '#', 'facebook.com', 'twitter.com', 'instagram.com', 'youtube.com', 'linkedin.com' ] if any(pattern in url.lower() for pattern in skip_patterns): return False return True def is_useless_line(self, line: str) -> bool: """Check if a line of text is likely useless footer/header content.""" line = line.lower() return any(kw in line for kw in self.useless_keywords) def is_valid_line(self, line: str) -> bool: """Check if a line is valid content.""" if not line or len(line.strip().split()) < 3: return False if self.is_useless_line(line): return False return True def is_heading_like(self, line: str) -> bool: """Check if a line is likely a heading.""" line = line.strip() word_count = len(line.split()) return ( line.isupper() and word_count <= 10 or (len(line) < 100 and word_count <= 15 and line.endswith((':', '?'))) ) def extract_clean_content(self, html: str) -> str: """Extract clean content with smart filtering.""" soup = BeautifulSoup(html, "html.parser") # Remove unwanted tags for tag in soup(["script", "style", "header", "footer", "nav", "aside"]): tag.decompose() # Try trafilatura first as it's often better at extracting main content trafilatura_content = trafilatura.extract(html, include_tables=True, include_images=False, include_links=True, output_format='txt') # If trafilatura fails or returns little content, use our own extraction if not trafilatura_content or len(trafilatura_content) < 200: raw_text = soup.get_text(separator="\n") lines = raw_text.split("\n") clean_lines = [] for line in lines: line = line.strip() if not self.is_valid_line(line): continue clean_lines.append(line) # Merge lines smartly formatted_text = "" buffer = "" for line in clean_lines: # Treat as Heading or List Item if self.is_heading_like(line) or line.startswith(("-", "*", "•")): if buffer: formatted_text += buffer.strip() + "\n\n" buffer = "" formatted_text += line.strip() + "\n" else: buffer += line + " " if buffer: formatted_text += buffer.strip() + "\n" return formatted_text.strip() return trafilatura_content def extract_content(self, html: str, url: str) -> Dict[str, Any]: """Extract structured content from HTML.""" soup = BeautifulSoup(html, 'html.parser') # Extract title title = soup.title.text.strip() if soup.title else "" # Get cleaned content content = self.extract_clean_content(html) # Extract headings headings = [] for h in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']): headings.append({ 'level': int(h.name[1]), 'text': h.get_text(strip=True) }) # Extract FAQs (common patterns in UB sites) faqs = [] # Look for accordion elements, common FAQ containers faq_containers = soup.select('.accordion, .faq, .collapse, .panel-group, .question-answer, details') for container in faq_containers: # Look for question/answer pairs in various formats question_selectors = ['.accordion-header', '.faq-question', '.card-header', 'summary', '.question', 'dt', 'h3', 'h4', '.panel-title'] answer_selectors = ['.accordion-body', '.faq-answer', '.card-body', '.answer', 'dd', '.panel-body', 'p'] # Try to select using CSS selectors questions = container.select(', '.join(question_selectors)) answers = container.select(', '.join(answer_selectors)) # Match questions with answers for i, q in enumerate(questions): if i < len(answers): faqs.append({ 'question': q.get_text(strip=True), 'answer': answers[i].get_text(strip=True) }) # Also try to detect Q&A patterns in paragraphs p_texts = [p.get_text(strip=True) for p in soup.find_all('p')] for i, text in enumerate(p_texts): if i < len(p_texts) - 1 and text.strip().endswith('?'): faqs.append({ 'question': text, 'answer': p_texts[i+1] }) # Extract important links important_links = [] for a in soup.find_all('a', href=True): link_text = a.get_text(strip=True) href = a['href'] if link_text and any(keyword in link_text.lower() for keyword in ['form', 'document', 'application', 'guide', 'i-20', 'opt', 'cpt']): important_links.append({ 'text': link_text, 'url': href }) # Metadata extraction from URL parsed = urlparse(url) path_parts = [p for p in parsed.path.strip("/").split("/") if p] # Try to categorize the content categories = [] if re.search(r'\b(visa|i-20|i20|sevis|immigration)\b', content, re.I): categories.append('immigration') if re.search(r'\b(opt|cpt|employment|work|job|internship)\b', content, re.I): categories.append('employment') if re.search(r'\b(tuition|fee|payment|cost|financial)\b', content, re.I): categories.append('fees') if re.search(r'\b(housing|accommodation|apartment|dorm|living)\b', content, re.I): categories.append('housing') # Build structured document document = { 'url': url, 'title': title, 'content': content, 'headings': headings, 'faqs': faqs, 'important_links': important_links, 'categories': categories, 'scraped_at': datetime.datetime.now().isoformat(), 'path_hierarchy': path_parts, 'domain': parsed.netloc } return document def extract_links(self, html: str, base_url: str) -> List[str]: """Extract all links from the page.""" soup = BeautifulSoup(html, 'html.parser') links = [] for a in soup.find_all('a', href=True): href = a['href'] # Handle relative URLs full_url = urljoin(base_url, href) # Normalize URL full_url = full_url.split('#')[0] # Remove fragment full_url = full_url.rstrip('/') # Remove trailing slash if self.is_valid_url(full_url): links.append(full_url) return links def process_pdf(self, url: str) -> Dict[str, Any]: """Download and extract text from PDF.""" with sync_playwright() as p: browser = p.chromium.launch() page = browser.new_page() try: page.goto(url, timeout=60000) # 60 second timeout # Get the PDF as bytes pdf_data = page.pdf(path=None) browser.close() except Exception as e: browser.close() print(f"Error downloading PDF {url}: {str(e)}") return None # Create a temporary file to use with PyMuPDF temp_path = os.path.join(self.output_dir, "temp.pdf") with open(temp_path, "wb") as f: f.write(pdf_data) # Extract text from PDF doc = pymupdf.open(temp_path) text = "" for page_num in range(doc.page_count): page = doc[page_num] text += page.get_text() doc.close() # Remove temporary file os.remove(temp_path) # Extract metadata from URL parsed = urlparse(url) path_parts = [p for p in parsed.path.strip("/").split("/") if p] filename = os.path.basename(url) # Categorize PDF content categories = [] if re.search(r'\b(visa|i-20|i20|sevis|immigration)\b', text, re.I): categories.append('immigration') if re.search(r'\b(opt|cpt|employment|work|job|internship)\b', text, re.I): categories.append('employment') if re.search(r'\b(tuition|fee|payment|cost|financial)\b', text, re.I): categories.append('fees') if re.search(r'\b(housing|accommodation|apartment|dorm|living)\b', text, re.I): categories.append('housing') # Build structured document document = { 'url': url, 'title': filename or os.path.basename(url), 'content': text, 'document_type': 'pdf', 'categories': categories, 'scraped_at': datetime.datetime.now().isoformat(), 'path_hierarchy': path_parts, 'domain': parsed.netloc } return document def scrape(self, max_pages: int = 100, max_depth: int = 4) -> None: """Main scraping function.""" pages_scraped = 0 depth_map = {self.seed_url: 0} # Track depth of each URL with sync_playwright() as p: browser = p.chromium.launch() page = browser.new_page() while self.queue and pages_scraped < max_pages: url = self.queue.pop(0) current_depth = depth_map.get(url, 0) if current_depth > max_depth: continue if url in self.visited_urls: continue try: print(f"Scraping: {url} (depth: {current_depth})") self.visited_urls.add(url) # Handle PDFs separately if url.lower().endswith('.pdf'): document = self.process_pdf(url) if document: # Save the document filename = f"{pages_scraped:04d}_{urlparse(url).netloc.replace('.', '_')}.json" filepath = os.path.join(self.output_dir, filename) with open(filepath, 'w') as f: json.dump(document, f, indent=2) pages_scraped += 1 else: # Regular webpage try: page.goto(url, timeout=30000) # 30 second timeout page.wait_for_load_state('networkidle', timeout=10000) # Wait for page to load html = page.content() # Extract content document = self.extract_content(html, url) # Save the document filename = f"{pages_scraped:04d}_{urlparse(url).netloc.replace('.', '_')}.json" filepath = os.path.join(self.output_dir, filename) with open(filepath, 'w') as f: json.dump(document, f, indent=2) pages_scraped += 1 # Extract links for further scraping if we haven't reached max depth if current_depth < max_depth: links = self.extract_links(html, url) for link in links: if link not in self.visited_urls and link not in self.queue: self.queue.append(link) depth_map[link] = current_depth + 1 except Exception as e: print(f"Error processing page {url}: {str(e)}") continue except Exception as e: print(f"Error scraping {url}: {str(e)}") # Add a small delay to be nice to the server time.sleep(1) browser.close() print(f"Scraping completed. Scraped {pages_scraped} pages.") # Example usage if __name__ == "__main__": scraper = BuffaloScraper() scraper.scrape(max_pages=100, max_depth=4)