Spaces:
Running
Running
import traceback | |
import streamlit as st | |
import json | |
import logging | |
import os | |
import time | |
import sys | |
import requests | |
import asyncio | |
import subprocess | |
import playwright.sync_api as sync_api | |
# Import our custom classes | |
from secure_scraper import SecureScraper | |
from llm_processor import LLMProcessor | |
# Try to import crawl4ai for debug information | |
try: | |
import crawl4ai | |
CRAWL4AI_IMPORTED = True | |
except ImportError: | |
CRAWL4AI_IMPORTED = False | |
# Try to import playwright for browser check | |
try: | |
import playwright | |
PLAYWRIGHT_IMPORTED = True | |
except ImportError: | |
PLAYWRIGHT_IMPORTED = False | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(sys.stdout), | |
logging.FileHandler("scraper_debug.log") | |
] | |
) | |
def check_playwright_browsers(): | |
"""Check if Playwright browsers are installed and provide instructions if not.""" | |
if not PLAYWRIGHT_IMPORTED: | |
return False, "Playwright is not installed. Install with: pip install playwright" | |
try: | |
# Try to run playwright installation check command | |
result = subprocess.run( | |
["python", "-m", "playwright", "install", "--help"], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
timeout=5 | |
) | |
# Check if chromium browser exists at common locations | |
chromium_path = sync_api.chromium.executable_path | |
firefox_path = sync_api.firefox.executable_path | |
webkit_path = sync_api.webkit.executable_path | |
browser_exists = any(os.path.exists(path) for path in [chromium_path, firefox_path, webkit_path]) | |
if not browser_exists: | |
return False, "Playwright browsers are not installed. Run: playwright install" | |
return True, "Playwright browsers appear to be installed" | |
except Exception as e: | |
return False, f"Error checking Playwright: {str(e)}" | |
def main(): | |
st.set_page_config( | |
page_title="LLM Web Scraper", | |
page_icon="🕸️", | |
layout="wide", | |
) | |
st.title("🕸️ LLM Web Scraper") | |
st.write("Scrape web content with privacy protection and open-source LLM processing - by Mokshith salian") | |
# Check for Playwright browsers | |
browsers_ok, browsers_message = check_playwright_browsers() | |
if not browsers_ok: | |
st.warning(f"⚠️ {browsers_message}") | |
st.info("To install the required browsers, run this command in your terminal:") | |
st.code("playwright install") | |
# Optional: add a button to try installing | |
if st.button("Try automatic installation"): | |
try: | |
with st.spinner("Installing Playwright browsers..."): | |
result = subprocess.run( | |
["python", "-m", "playwright", "install"], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
timeout=120 | |
) | |
if result.returncode == 0: | |
st.success("Installation successful! Please refresh the page.") | |
else: | |
st.error(f"Installation failed: {result.stderr}") | |
st.code(result.stdout) | |
except Exception as e: | |
st.error(f"Error during installation: {str(e)}") | |
st.info("Please run the command manually in your terminal.") | |
# Debug expander at the top | |
with st.expander("Debug Information", expanded=False): | |
st.write("Python version:", sys.version) | |
try: | |
import requests | |
st.write("Requests version:", requests.__version__) | |
except ImportError: | |
st.error("Requests not installed!") | |
# crawl4ai debug information | |
if CRAWL4AI_IMPORTED: | |
try: | |
st.write("crawl4ai version:", getattr(crawl4ai, "__version__", "Unknown")) | |
st.write("crawl4ai available methods:", [method for method in dir(crawl4ai) if not method.startswith("_")]) | |
except: | |
st.write("crawl4ai is installed but version information is not available") | |
else: | |
st.error("crawl4ai not installed!") | |
# Playwright debug information | |
try: | |
import playwright | |
# Playwright package doesn't have __version__ directly accessible | |
try: | |
# Try to get version from package metadata if available | |
from importlib.metadata import version | |
playwright_version = version("playwright") | |
except: | |
# Fallback to getting version via pip subprocess | |
try: | |
result = subprocess.run( | |
[sys.executable, "-m", "pip", "show", "playwright"], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
timeout=5 | |
) | |
for line in result.stdout.split("\n"): | |
if line.startswith("Version:"): | |
playwright_version = line.split("Version:")[1].strip() | |
break | |
else: | |
playwright_version = "Unknown" | |
except: | |
playwright_version = "Unknown" | |
st.write("Playwright version:", playwright_version) | |
# Check if browsers are installed | |
browsers_ok, browsers_message = check_playwright_browsers() | |
st.write(f"Playwright browsers: {browsers_message}") | |
except ImportError: | |
st.error("Playwright not installed!") | |
try: | |
import transformers | |
st.write("Transformers version:", transformers.__version__) | |
except ImportError: | |
st.error("Transformers not installed!") | |
try: | |
import torch | |
st.write("PyTorch version:", torch.__version__) | |
st.write("CUDA available:", torch.cuda.is_available()) | |
if torch.cuda.is_available(): | |
st.write("CUDA device:", torch.cuda.get_device_name(0)) | |
except ImportError: | |
st.error("PyTorch not installed!") | |
# Configuration section | |
with st.sidebar: | |
st.header("Configuration") | |
st.subheader("LLM Model Selection") | |
model_option = st.selectbox( | |
"Choose LLM Model", | |
[ | |
"microsoft/phi-2 (fastest, 2.7B)", | |
"google/gemma-2b (balanced)", | |
"mistralai/Mistral-7B-Instruct-v0.2 (best quality, slowest)" | |
], | |
index=0 | |
) | |
# Convert selection to model name | |
model_name = model_option.split(" ")[0] | |
st.subheader("Privacy Settings") | |
use_proxy = st.checkbox("Use Proxy Rotation", value=False) | |
use_user_agent = st.checkbox("Use User-Agent Rotation", value=True) | |
# Add AsyncWebCrawler specific settings | |
st.subheader("Crawler Settings") | |
max_connections = st.slider("Max Connections", min_value=1, max_value=20, value=10) | |
timeout_seconds = st.slider("Request Timeout (seconds)", min_value=5, max_value=60, value=30) | |
max_retries = st.slider("Max Retries", min_value=1, max_value=10, value=5) | |
test_mode = st.sidebar.checkbox("Enable Test Mode", value=False) | |
# If in test mode, show a simplified test interface | |
if test_mode: | |
st.header("🔍 Test Mode") | |
st.info("This mode lets you test basic web connectivity without the full pipeline") | |
test_url = st.text_input("Test URL", "https://www.example.com") | |
if st.button("Test Connection"): | |
try: | |
with st.spinner("Testing connection..."): | |
# First try with requests for basic connectivity | |
basic_response = requests.get(test_url, timeout=10) | |
st.success(f"Basic HTTP connection successful: Status {basic_response.status_code}") | |
# Then try with our crawler | |
st.info("Now testing with crawl4ai integration...") | |
# Configure proxy list based on user selection | |
proxy_list = None | |
if use_proxy: | |
# Example proxy list - in production you'd load from a secured source | |
proxy_list = [ | |
"http://example-proxy1.com:8080", | |
"http://example-proxy2.com:8080" | |
] | |
# Initialize the scraper with the configured settings | |
test_scraper = SecureScraper(proxy_list=proxy_list) | |
test_scraper.crawler.max_connections = max_connections | |
test_scraper.crawler.timeout = timeout_seconds | |
result = test_scraper.scrape_url(test_url) | |
if result['status'] == 'success': | |
st.success(f"crawl4ai connection successful") | |
st.write("Privacy settings used:") | |
st.json(result['privacy']) | |
with st.expander("Response Preview"): | |
st.write(result['data']['title']) | |
st.write(result['data']['text'][:1000] + "..." if len(result['data']['text']) > 1000 else result['data']['text']) | |
else: | |
st.error(f"crawl4ai connection failed: {result['message']}") | |
except Exception as e: | |
st.error(f"Connection failed: {str(e)}") | |
st.code(traceback.format_exc()) | |
# Input section | |
st.header("Scraping Target") | |
url = st.text_input("Enter the URL to scrape", placeholder="https://example.com/") | |
with st.expander("Advanced Scraping Options"): | |
css_selectors_text = st.text_area( | |
"CSS Selectors (JSON format)", | |
placeholder='{"title": "h1", "price": ".product-price", "description": ".product-description"}' | |
) | |
# Parse CSS selectors | |
css_selectors = None | |
if css_selectors_text: | |
try: | |
css_selectors = json.loads(css_selectors_text) | |
except json.JSONDecodeError: | |
st.error("Invalid JSON for CSS selectors") | |
st.header("LLM Processing") | |
llm_instruction = st.text_area( | |
"What do you want the LLM to do with the scraped data?", | |
placeholder="Extract the main product features and summarize them in bullet points" | |
) | |
# Initialize on button click | |
if st.button("Scrape and Process"): | |
if not url: | |
st.error("Please enter a URL to scrape") | |
return | |
# Show progress | |
with st.spinner("Initializing scraper..."): | |
# Configure proxy list based on user selection | |
proxy_list = None | |
if use_proxy: | |
st.warning("Using proxy rotation - in a production system, you'd want to use paid proxies") | |
# Example proxy list - in production you'd load from a secured source | |
proxy_list = [ | |
"http://example-proxy1.com:8080", | |
"http://example-proxy2.com:8080" | |
] | |
# Initialize the scraper with updated parameters | |
scraper = SecureScraper(proxy_list=proxy_list) | |
# Update AsyncWebCrawler settings based on user input | |
scraper.crawler.max_connections = max_connections | |
scraper.crawler.timeout = timeout_seconds | |
scraper.crawler.random_user_agent = use_user_agent | |
error_placeholder = st.empty() | |
# Perform scraping | |
with st.spinner("Scraping website"): | |
# First, test basic connectivity with a direct request | |
st.info(f"Testing basic connectivity to {url}") | |
try: | |
test_response = requests.get(url, timeout=10) | |
st.success(f"Basic connection successful: HTTP {test_response.status_code}") | |
except Exception as e: | |
st.warning(f"Basic connection test failed: {str(e)}. Trying with crawl4ai anyway...") | |
# Check if Playwright browsers are installed before scraping | |
browsers_ok, _ = check_playwright_browsers() | |
if not browsers_ok: | |
st.error("Cannot scrape: Playwright browsers are not installed. Please install them first.") | |
return | |
try: | |
# Now perform the actual scraping with our scraper | |
result = scraper.scrape_url(url, css_selectors) | |
if result['status'] == 'error': | |
st.error(f"Scraping failed: {result['message']}") | |
return | |
except Exception as e: | |
if "Executable doesn't exist" in str(e): | |
st.error("Error: Playwright browser not found. Please install using the button at the top of the page.") | |
return | |
else: | |
st.error(f"Scraping error: {str(e)}") | |
st.code(traceback.format_exc()) | |
return | |
st.success("Scraping completed successfully!") | |
# Display privacy measures used | |
st.subheader("Privacy Measures Used") | |
st.json(result['privacy']) | |
# Display raw scraped data | |
with st.expander("Raw Scraped Data"): | |
st.json(result['data']) | |
# Don't attempt LLM processing if scraping failed | |
if 'result' not in locals() or result['status'] == 'error': | |
return | |
# Process with LLM | |
with st.spinner(f"Processing with {model_name}..."): | |
try: | |
llm = LLMProcessor(model_name=model_name) | |
# Prepare data for LLM (convert to string if it's a dict) | |
scraped_data_str = json.dumps(result['data'], indent=2) if isinstance(result['data'], dict) else result['data'] | |
processed_result = llm.process_data( | |
scraped_data_str, | |
llm_instruction if llm_instruction else "Summarize this information" | |
) | |
st.subheader("LLM Processing Result") | |
st.write(processed_result) | |
except Exception as e: | |
st.error(f"Error in LLM processing: {str(e)}") | |
st.info("Try using a smaller model like microsoft/phi-2 if you're facing memory issues") | |
logging.error(f"LLM processing error: {str(e)}") | |
logging.error(traceback.format_exc()) | |
# Create a utility for running async code in Streamlit | |
def run_async_code(coro): | |
"""Run an async coroutine in a Streamlit app.""" | |
try: | |
loop = asyncio.new_event_loop() | |
return loop.run_until_complete(coro) | |
finally: | |
loop.close() | |
if __name__ == "__main__": | |
main() |