Spaces:
Running
Running
| import asyncio | |
| from playwright.async_api import async_playwright | |
| from playwright_stealth import Stealth | |
| from bs4 import BeautifulSoup | |
| from crewai.tools import BaseTool | |
| class StealthScrapeTool(BaseTool): | |
| name: str = "Stealth Web Scraper" | |
| description: str = "A tool for stealthily scraping content from a given URL using Playwright and a CSS selector." | |
| async def _arun(self, website_url: str, css_element = "body", wait_for_selectors: list[str] = None) -> str: | |
| try: | |
| async with Stealth().use_async(async_playwright()) as p: | |
| browser = await p.chromium.launch(headless=True) | |
| page = await browser.new_page() | |
| print(f"StealthScrapeTool: Starting scraping for {website_url}...") | |
| print(f"StealthScrapeTool: Navigating to {website_url}") | |
| await page.goto(website_url, timeout=120000) | |
| await asyncio.sleep(5) | |
| # Scroll to the bottom of the page repeatedly to load all dynamic content | |
| print("StealthScrapeTool: Scrolling through the page to load dynamic content...") | |
| print("StealthScrapeTool: Getting initial scrollHeight...") | |
| last_height = await page.evaluate("document.body.scrollHeight") | |
| print(f"StealthScrapeTool: Initial scrollHeight: {last_height}") | |
| scroll_attempts = 0 | |
| max_scroll_attempts = 10 | |
| while scroll_attempts < max_scroll_attempts: | |
| print(f"StealthScrapeTool: Scroll attempt {scroll_attempts + 1}") | |
| print("StealthScrapeTool: Scrolling to bottom...") | |
| await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") | |
| print("StealthScrapeTool: Scrolled. Waiting for content to load...") | |
| await asyncio.sleep(5) | |
| print("StealthScrapeTool: Getting new scrollHeight...") | |
| new_height = await page.evaluate("document.body.scrollHeight") | |
| print(f"StealthScrapeTool: New scrollHeight: {new_height}") | |
| if new_height == last_height: | |
| print("StealthScrapeTool: ScrollHeight unchanged. Breaking scroll loop.") | |
| break | |
| last_height = new_height | |
| scroll_attempts += 1 | |
| print("StealthScrapeTool: Finished scrolling.") | |
| print(f"StealthScrapeTool: Page loaded. Attempting to find element with selector '{css_element}'") | |
| # Element waiting logic | |
| selectors_to_wait_for = [] | |
| if wait_for_selectors: | |
| print("StealthScrapeTool: Additional selectors to wait for provided.") | |
| selectors_to_wait_for.extend(wait_for_selectors) | |
| # Always include css_element in the list of selectors to wait for | |
| selectors_to_wait_for.append(css_element) | |
| combined_selector = ", ".join(selectors_to_wait_for) | |
| print(f"StealthScrapeTool: Waiting for selectors: {combined_selector}") | |
| await page.wait_for_selector(combined_selector, timeout=60000, state='attached') | |
| print("StealthScrapeTool: Required elements found. Extracting content...") | |
| html_content = await page.content() | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Debug print to confirm if waited-for elements are in the scraped content | |
| if soup.select_one("#all-reviews"): | |
| print("StealthScrapeTool: #all-reviews found in scraped content.") | |
| else: | |
| print("StealthScrapeTool: #all-reviews NOT found in scraped content.") | |
| target_element = soup.select_one(css_element) | |
| if target_element: | |
| # Clean the HTML content | |
| print(f"Successfully found element with selector '{css_element}'. Cleaning content...") | |
| for script in target_element.find_all("script"): | |
| script.decompose() | |
| for style_tag in target_element.find_all("style"): | |
| style_tag.decompose() | |
| for img in target_element.find_all("img"): | |
| img.decompose() | |
| for svg in target_element.find_all("svg"): | |
| svg.decompose() | |
| for iframe in target_element.find_all("iframe"): | |
| iframe.decompose() | |
| for source_tag in target_element.find_all("source"): | |
| source_tag.decompose() | |
| # Remove style attributes from all tags | |
| for tag in target_element.find_all(True): | |
| if 'style' in tag.attrs: | |
| del tag['style'] | |
| return target_element.prettify() | |
| else: | |
| return f"Error: Could not find element with selector '{css_element}' on the page." | |
| except Exception as e: | |
| return f"Error during stealth web scraping: {e}" | |
| def _run(self, website_url: str, css_element: str, wait_for_selectors: list[str] = None) -> str: | |
| # This method is for synchronous execution, which is not ideal for Playwright. | |
| # CrewAI typically calls _arun for async tools. | |
| # For simplicity, we'll just call the async version here. | |
| return asyncio.run(self._arun(website_url, css_element, wait_for_selectors)) |