|
""" |
|
π Web Research Tools |
|
Advanced web research using DuckDuckGo search and Crawl4AI content extraction |
|
""" |
|
|
|
import os |
|
import requests |
|
from typing import List, Dict, Any, Optional |
|
from duckduckgo_search import DDGS |
|
from bs4 import BeautifulSoup |
|
import logging |
|
|
|
|
|
try: |
|
from crawl4ai import ( |
|
AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, LLMConfig, |
|
LLMContentFilter, DefaultMarkdownGenerator |
|
) |
|
CRAWL4AI_AVAILABLE = True |
|
except ImportError: |
|
CRAWL4AI_AVAILABLE = False |
|
print("β οΈ Crawl4AI not available, using fallback web scraping") |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class WebResearcher: |
|
"""Advanced web research using DuckDuckGo and Crawl4AI""" |
|
|
|
def __init__(self, max_results: int = 10, max_crawl_pages: int = 7, llm_provider: str = None): |
|
self.max_results = max_results |
|
self.max_crawl_pages = max_crawl_pages |
|
self.llm_provider = llm_provider or "openai" |
|
|
|
if CRAWL4AI_AVAILABLE: |
|
self.browser_config = BrowserConfig( |
|
headless=True, |
|
viewport_width=1280, |
|
viewport_height=720 |
|
) |
|
else: |
|
self.browser_config = None |
|
print("π Using fallback web scraping (requests + BeautifulSoup)") |
|
|
|
async def search_topic(self, topic: str, region: str = "us-en") -> List[Dict[str, Any]]: |
|
"""Search for a topic using DuckDuckGo""" |
|
try: |
|
print(f"π Searching DuckDuckGo for: {topic}") |
|
|
|
with DDGS() as ddgs: |
|
results = [] |
|
search_results = ddgs.text( |
|
keywords=topic, |
|
region=region, |
|
safesearch="moderate", |
|
max_results=self.max_results |
|
) |
|
|
|
for result in search_results: |
|
results.append({ |
|
"title": result.get("title", ""), |
|
"url": result.get("href", ""), |
|
"snippet": result.get("body", ""), |
|
"source": "duckduckgo" |
|
}) |
|
|
|
print(f"β
Found {len(results)} search results") |
|
return results |
|
|
|
except Exception as e: |
|
logger.error(f"Search failed: {e}") |
|
print(f"β Search failed: {e}") |
|
return [] |
|
|
|
async def _fallback_extract_content(self, urls: List[str]) -> List[Dict[str, Any]]: |
|
"""Fallback content extraction using requests and BeautifulSoup""" |
|
extracted_content = [] |
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
} |
|
|
|
for i, url in enumerate(urls[:self.max_crawl_pages]): |
|
try: |
|
print(f"π Scraping {i+1}/{min(len(urls), self.max_crawl_pages)}: {url}") |
|
|
|
response = requests.get(url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
for script in soup(["script", "style", "nav", "footer", "header"]): |
|
script.decompose() |
|
|
|
|
|
title = "" |
|
if soup.title: |
|
title = soup.title.string.strip() |
|
|
|
|
|
content_selectors = [ |
|
'main', 'article', '.content', '#content', |
|
'.post-content', '.entry-content', '.article-content' |
|
] |
|
|
|
content = "" |
|
for selector in content_selectors: |
|
content_elem = soup.select_one(selector) |
|
if content_elem: |
|
content = content_elem.get_text(separator='\n', strip=True) |
|
break |
|
|
|
|
|
if not content: |
|
content = soup.get_text(separator='\n', strip=True) |
|
|
|
|
|
lines = [line.strip() for line in content.split('\n') if line.strip()] |
|
content = '\n'.join(lines) |
|
|
|
word_count = len(content.split()) |
|
|
|
extracted_content.append({ |
|
"url": url, |
|
"title": title, |
|
"content": content, |
|
"word_count": word_count, |
|
"extraction_success": True |
|
}) |
|
|
|
print(f"β
Extracted {word_count} words from {url}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error scraping {url}: {e}") |
|
print(f"β Error scraping {url}: {e}") |
|
extracted_content.append({ |
|
"url": url, |
|
"title": "", |
|
"content": "", |
|
"word_count": 0, |
|
"extraction_success": False, |
|
"error": str(e) |
|
}) |
|
|
|
successful_extractions = [c for c in extracted_content if c["extraction_success"]] |
|
print(f"β
Successfully extracted content from {len(successful_extractions)}/{len(urls)} URLs") |
|
|
|
return extracted_content |
|
|
|
async def extract_content(self, urls: List[str], topic: str) -> List[Dict[str, Any]]: |
|
"""Extract content from URLs using Crawl4AI with LLM filtering""" |
|
|
|
|
|
if not CRAWL4AI_AVAILABLE: |
|
print("π Using fallback content extraction (Crawl4AI not available)") |
|
return await self._fallback_extract_content(urls) |
|
|
|
|
|
try: |
|
from playwright.async_api import async_playwright |
|
async with async_playwright() as p: |
|
|
|
browser_path = p.chromium.executable_path |
|
if not browser_path or not os.path.exists(browser_path): |
|
print("π Playwright browsers not installed, using fallback content extraction") |
|
return await self._fallback_extract_content(urls) |
|
except Exception as e: |
|
print(f"π Playwright check failed ({e}), using fallback content extraction") |
|
return await self._fallback_extract_content(urls) |
|
|
|
try: |
|
print(f"π Extracting content from {len(urls)} URLs...") |
|
|
|
|
|
try: |
|
|
|
crawl4ai_provider_simple = self.llm_provider |
|
|
|
|
|
provider_mapping = { |
|
"openai": "openai/gpt-4o-mini", |
|
"google": "gemini/gemini-2.0-flash-exp", |
|
"gemini": "gemini/gemini-2.0-flash-exp", |
|
"anthropic": "gemini/gemini-2.0-flash-exp" |
|
} |
|
|
|
crawl4ai_provider = provider_mapping.get(crawl4ai_provider_simple, "openai/gpt-4o-mini") |
|
|
|
if crawl4ai_provider.startswith("gemini"): |
|
|
|
if not os.getenv("GOOGLE_API_KEY"): |
|
print("β οΈ GOOGLE_API_KEY not found, falling back to OpenAI") |
|
llm_config = LLMConfig( |
|
provider="openai/gpt-4o-mini", |
|
api_token="env:OPENAI_API_KEY" |
|
) |
|
print("π§ Using OpenAI for content filtering: gpt-4o-mini (fallback)") |
|
else: |
|
llm_config = LLMConfig( |
|
provider=crawl4ai_provider, |
|
api_token="env:GOOGLE_API_KEY" |
|
) |
|
print(f"π§ Using Gemini for content filtering: {crawl4ai_provider}") |
|
else: |
|
|
|
llm_config = LLMConfig( |
|
provider="openai/gpt-4o-mini", |
|
api_token="env:OPENAI_API_KEY" |
|
) |
|
print("π§ Using OpenAI for content filtering: gpt-4o-mini") |
|
|
|
content_filter = LLMContentFilter( |
|
llm_config=llm_config, |
|
instruction=f""" |
|
Extract educational content related to "{topic}". |
|
Focus on: |
|
- Key concepts and explanations |
|
- Practical examples and tutorials |
|
- Technical details and specifications |
|
- Best practices and guidelines |
|
- Code examples and implementations |
|
|
|
Exclude: |
|
- Navigation menus and sidebars |
|
- Advertisements and promotional content |
|
- Footer content and legal text |
|
- Unrelated content |
|
|
|
Format as clean markdown with proper headers and code blocks. |
|
""", |
|
chunk_token_threshold=1000, |
|
verbose=False |
|
) |
|
|
|
markdown_generator = DefaultMarkdownGenerator( |
|
content_filter=content_filter, |
|
options={"ignore_links": False} |
|
) |
|
except Exception as e: |
|
print(f"β οΈ Could not configure LLM content filter: {e}") |
|
|
|
markdown_generator = DefaultMarkdownGenerator( |
|
options={"ignore_links": False} |
|
) |
|
|
|
run_config = CrawlerRunConfig( |
|
markdown_generator=markdown_generator, |
|
cache_mode=CacheMode.BYPASS, |
|
wait_for_images=False, |
|
process_iframes=False, |
|
remove_overlay_elements=True |
|
) |
|
|
|
extracted_content = [] |
|
|
|
async with AsyncWebCrawler(config=self.browser_config) as crawler: |
|
for i, url in enumerate(urls[:self.max_crawl_pages]): |
|
try: |
|
print(f"π Crawling {i+1}/{min(len(urls), self.max_crawl_pages)}: {url}") |
|
|
|
result = await crawler.arun(url=url, config=run_config) |
|
|
|
if result.success and result.markdown: |
|
extracted_content.append({ |
|
"url": url, |
|
"title": result.metadata.get("title", ""), |
|
"content": result.markdown, |
|
"word_count": len(result.markdown.split()), |
|
"extraction_success": True |
|
}) |
|
print(f"β
Extracted {len(result.markdown.split())} words from {url}") |
|
else: |
|
print(f"β οΈ Failed to extract content from {url}: {result.error_message}") |
|
extracted_content.append({ |
|
"url": url, |
|
"title": "", |
|
"content": "", |
|
"word_count": 0, |
|
"extraction_success": False, |
|
"error": result.error_message |
|
}) |
|
|
|
except Exception as e: |
|
logger.error(f"Error crawling {url}: {e}") |
|
print(f"β Error crawling {url}: {e}") |
|
extracted_content.append({ |
|
"url": url, |
|
"title": "", |
|
"content": "", |
|
"word_count": 0, |
|
"extraction_success": False, |
|
"error": str(e) |
|
}) |
|
|
|
successful_extractions = [c for c in extracted_content if c["extraction_success"]] |
|
print(f"β
Successfully extracted content from {len(successful_extractions)}/{len(urls)} URLs") |
|
|
|
return extracted_content |
|
|
|
except Exception as e: |
|
logger.error(f"Content extraction failed: {e}") |
|
print(f"β Content extraction failed: {e}") |
|
|
|
|
|
error_str = str(e) |
|
playwright_errors = [ |
|
"Executable doesn't exist", |
|
"BrowserType.launch", |
|
"playwright install", |
|
"Playwright was just installed", |
|
"download new browsers", |
|
"chromium-", |
|
"chrome-linux/chrome" |
|
] |
|
|
|
if any(error in error_str for error in playwright_errors): |
|
print("π Playwright browser binaries not available, falling back to simple web scraping") |
|
return await self._fallback_extract_content(urls) |
|
|
|
return [] |
|
|
|
async def research_topic(self, topic: str) -> Dict[str, Any]: |
|
"""Complete research workflow: search + extract + summarize""" |
|
try: |
|
print(f"π Starting comprehensive research for: {topic}") |
|
|
|
|
|
search_results = await self.search_topic(topic) |
|
|
|
if not search_results: |
|
return { |
|
"topic": topic, |
|
"search_results": [], |
|
"extracted_content": [], |
|
"summary": f"No search results found for {topic}", |
|
"success": False |
|
} |
|
|
|
|
|
urls = [result["url"] for result in search_results] |
|
extracted_content = await self.extract_content(urls, topic) |
|
|
|
|
|
successful_content = [c for c in extracted_content if c["extraction_success"]] |
|
total_words = sum(c["word_count"] for c in successful_content) |
|
|
|
summary = f""" |
|
Research completed for "{topic}": |
|
- Found {len(search_results)} search results |
|
- Successfully extracted content from {len(successful_content)} sources |
|
- Total content: {total_words} words |
|
- Sources include educational articles, documentation, and tutorials |
|
""" |
|
|
|
print(f"π Research completed: {len(successful_content)} sources, {total_words} words") |
|
|
|
return { |
|
"topic": topic, |
|
"search_results": search_results, |
|
"extracted_content": extracted_content, |
|
"summary": summary.strip(), |
|
"total_words": total_words, |
|
"successful_sources": len(successful_content), |
|
"success": True |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Research failed: {e}") |
|
print(f"β Research failed: {e}") |
|
return { |
|
"topic": topic, |
|
"search_results": [], |
|
"extracted_content": [], |
|
"summary": f"Research failed for {topic}: {str(e)}", |
|
"success": False |
|
} |
|
|
|
|
|
async def research_topic(topic: str, llm_provider: str = "openai") -> Dict[str, Any]: |
|
"""Convenience function for topic research with LLM provider""" |
|
web_researcher = WebResearcher(llm_provider=llm_provider) |
|
return await web_researcher.research_topic(topic) |
|
|