Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Request | |
| from pydantic import HttpUrl | |
| from playwright.async_api import async_playwright | |
| from urllib.parse import urljoin, urlparse | |
| import logging | |
| from fastapi.responses import JSONResponse | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| from typing import List, Dict | |
| import asyncio | |
| import os | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI(title="Website Scraper API with Frontend") | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Set up Jinja2 templates | |
| templates = Jinja2Templates(directory="templates") | |
| # Maximum number of pages to scrape | |
| MAX_PAGES = 20 | |
| async def scrape_page(url: str, visited: set, base_domain: str) -> tuple[Dict, set]: | |
| """Scrape a single page for text, images, and links using Playwright.""" | |
| try: | |
| logger.info(f"Starting Playwright for URL: {url}") | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True) | |
| context = await browser.new_context( | |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", | |
| viewport={"width": 800, "height": 600}, # Reduced viewport for performance | |
| bypass_csp=True # Bypass Content Security Policy | |
| ) | |
| page = await context.new_page() | |
| # Retry navigation with fallback | |
| for attempt in range(2): # Try up to 2 times | |
| try: | |
| logger.info(f"Navigating to {url} (Attempt {attempt + 1})") | |
| await page.goto(url, wait_until="domcontentloaded", timeout=30000) # 30s timeout | |
| break # Success, exit retry loop | |
| except Exception as e: | |
| logger.warning(f"Navigation attempt {attempt + 1} failed for {url}: {str(e)}") | |
| if attempt == 1: # Last attempt | |
| logger.error(f"All navigation attempts failed for {url}") | |
| await browser.close() | |
| return {}, set() | |
| await asyncio.sleep(1) # Wait before retry | |
| # Scroll to trigger lazy-loaded images | |
| await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") | |
| await page.wait_for_timeout(2000) # Wait for lazy-loaded content | |
| # Extract text content | |
| text_content = await page.evaluate( | |
| """() => document.body.innerText""" | |
| ) | |
| text_content = ' '.join(text_content.split()) if text_content else "" | |
| # Extract images from src, data-src, and srcset | |
| images = await page.evaluate( | |
| """() => { | |
| const imgElements = document.querySelectorAll('img'); | |
| const imgUrls = new Set(); | |
| imgElements.forEach(img => { | |
| if (img.src) imgUrls.add(img.src); | |
| if (img.dataset.src) imgUrls.add(img.dataset.src); | |
| if (img.srcset) { | |
| img.srcset.split(',').forEach(src => { | |
| const url = src.trim().split(' ')[0]; | |
| if (url) imgUrls.add(url); | |
| }); | |
| } | |
| }); | |
| return Array.from(imgUrls); | |
| }""" | |
| ) | |
| images = [urljoin(url, img) for img in images if img] | |
| # Extract links | |
| links = await page.evaluate( | |
| """() => Array.from(document.querySelectorAll('a')).map(a => a.href)""" | |
| ) | |
| links = set(urljoin(url, link) for link in links if urlparse(urljoin(url, link)).netloc == base_domain and urljoin(url, link) not in visited) | |
| await browser.close() | |
| logger.info(f"Successfully scraped {url}") | |
| page_data = { | |
| "url": url, | |
| "text": text_content, | |
| "images": images | |
| } | |
| return page_data, links | |
| except Exception as e: | |
| logger.error(f"Error scraping {url}: {str(e)}") | |
| return {}, set() | |
| async def crawl_website(url: HttpUrl): | |
| """Crawl the website starting from the given URL and return scraped data for up to 10 pages as JSON.""" | |
| try: | |
| logger.info(f"Starting crawl for {url}") | |
| visited = set() | |
| to_visit = {str(url)} | |
| base_domain = urlparse(str(url)).netloc | |
| results = [] | |
| while to_visit and len(visited) < MAX_PAGES: | |
| current_url = to_visit.pop() | |
| if current_url in visited: | |
| continue | |
| logger.info(f"Scraping: {current_url}") | |
| visited.add(current_url) | |
| page_data, new_links = await scrape_page(current_url, visited, base_domain) | |
| if page_data: | |
| results.append(page_data) | |
| to_visit.update(new_links) | |
| # Small delay to avoid overwhelming the server | |
| await asyncio.sleep(0.5) | |
| logger.info(f"Crawl completed for {url}") | |
| return JSONResponse(content={"pages": results}) | |
| except Exception as e: | |
| logger.error(f"Scraping failed for {url}: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}") | |
| async def serve_home(request: Request): | |
| """Serve the frontend HTML page.""" | |
| logger.info("Serving home page") | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| if __name__ == "__main__": | |
| logger.info("Starting FastAPI server on port 7860") | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |