Spaces:
Runtime error
Runtime error
File size: 29,466 Bytes
742e5a0 0f22e7f 1e2267b 0f22e7f 1735410 d29e4d4 acb3057 d29e4d4 4c6fb3f bd69a13 4c6fb3f bd69a13 ed640fa cc5bfb5 dde48e3 1735410 2bf6754 dde48e3 2bf6754 0f22e7f d29e4d4 dde48e3 4c6fb3f ed640fa ce0e61c ed640fa 458403a 0463d3e 458403a 0463d3e 458403a 0463d3e 458403a ed640fa aae37aa 7c59399 bd69a13 aae37aa bd69a13 aae37aa 901d2c5 aae37aa 901d2c5 0e0f8eb aae37aa 7c59399 208ba80 aae37aa bd69a13 aae37aa bd69a13 ed640fa bd69a13 ed640fa bd69a13 ed640fa bd69a13 ed640fa bd69a13 ed640fa bd69a13 ed640fa 4c6fb3f efa578d cfb3d17 144e29e 4c6fb3f 144e29e 9a457e3 74715c8 144e29e efa578d a04067e d4a8ad2 a04067e efa578d 208ba80 2bf6754 144e29e 12339e7 efa578d cce82a9 4c6fb3f 0d12335 4c6fb3f 2bf6754 4c6fb3f 2bf6754 4c6fb3f 144e29e bd69a13 144e29e bd69a13 144e29e 4c6fb3f 144e29e 4c6fb3f 2e1a1c1 4c6fb3f ed640fa 4c6fb3f 2bf6754 0f22e7f cce82a9 0f22e7f 2bf6754 9a457e3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 |
import gradio as gr
import requests
import re
import logging
import json
from typing import Tuple, List, Dict, Union, Optional
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from datetime import datetime
import io
import zipfile
import os
import tempfile
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from PIL import Image
import base64
import asyncio
import yaml
from pathlib import Path
from tqdm import tqdm
import plotly.graph_objects as go
# Configure detailed logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('webscraper.log'),
logging.StreamHandler()
]
)
# Download necessary NLTK data
import nltk
try:
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
except Exception as e:
logging.error(f"Error downloading NLTK data: {str(e)}")
# Configuration and logging setup
class Config:
DATA_DIR = Path('scraped_data')
LOGS_DIR = Path('logs')
MAX_RETRIES = 3
TIMEOUT = 30
@classmethod
def initialize(cls):
"""Initialize necessary directories and configurations"""
cls.DATA_DIR.mkdir(exist_ok=True)
cls.LOGS_DIR.mkdir(exist_ok=True)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(cls.LOGS_DIR / 'app.log'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
logger = Config.initialize()
class WebDriverManager:
"""Manage WebDriver instances"""
@staticmethod
def get_driver() -> webdriver.Chrome:
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1080')
return webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
class DataExtractor:
"""Extract and process webpage content"""
def __init__(self):
self.soup = None
self.url = None
self.logger = logging.getLogger(__name__)
def set_page(self, html: str, url: str):
"""Set the page content for extraction"""
self.soup = BeautifulSoup(html, 'html.parser')
self.url = url
def extract_images(self) -> List[Dict]:
"""Extract image information from the page"""
images = []
try:
for img in self.soup.find_all('img'):
image_info = {
'src': urljoin(self.url, img.get('src', '')),
'alt': img.get('alt', ''),
'title': img.get('title', ''),
'dimensions': self._get_image_dimensions(img),
'file_type': self._get_file_type(img.get('src', ''))
}
images.append(image_info)
except Exception as e:
self.logger.error(f"Error extracting images: {str(e)}")
return images
def extract_links(self) -> List[Dict]:
"""Extract link information from the page"""
links = []
try:
for a in self.soup.find_all('a', href=True):
absolute_url = urljoin(self.url, a.get('href', ''))
link_info = {
'href': absolute_url,
'text': a.get_text(strip=True),
'title': a.get('title', ''),
'type': 'internal' if self.url in absolute_url else 'external',
'has_image': bool(a.find('img'))
}
links.append(link_info)
except Exception as e:
self.logger.error(f"Error extracting links: {str(e)}")
return links
def extract_text(self) -> List[Dict]:
"""Extract text content from the page"""
texts = []
try:
for text_element in self.soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
text_info = {
'content': text_element.get_text(strip=True),
'source': text_element.name
}
if text_info['content']: # Only add non-empty text blocks
texts.append(text_info)
except Exception as e:
self.logger.error(f"Error extracting text: {str(e)}")
return texts
def _get_image_dimensions(self, img_tag) -> str:
"""Get image dimensions from tag attributes"""
width = img_tag.get('width', '')
height = img_tag.get('height', '')
if width and height:
return f"{width}x{height}"
return "unknown"
def _get_file_type(self, src: str) -> str:
"""Determine image file type from URL"""
if not src:
return "unknown"
ext = src.split('.')[-1].lower()
return ext if ext in ['jpg', 'jpeg', 'png', 'gif', 'webp'] else "unknown"
class QueryAnalyzer:
"""Analyze natural language queries"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.stop_words = set(stopwords.words('english'))
self.lemmatizer = WordNetLemmatizer()
self.logger.info("QueryAnalyzer initialized")
def parse_query(self, query: str) -> Dict[str, Union[str, int]]:
try:
self.logger.info(f"Parsing query: {query}")
tokens = word_tokenize(query.lower())
filtered_tokens = [self.lemmatizer.lemmatize(token) for token in tokens
if token.isalnum() and token not in self.stop_words]
return {
'target': self._identify_target(filtered_tokens),
'limit': self._identify_limit(filtered_tokens),
'filters': self._identify_filters(filtered_tokens),
'output': 'JSON' if 'json' in query.lower() else 'Formatted Text'
}
except Exception as e:
self.logger.error(f"Error parsing query: {str(e)}")
return {'target': 'unknown', 'limit': 0, 'filters': {}}
def _identify_target(self, tokens: List[str]) -> str:
target_map = {
'image': 'image',
'images': 'image',
'picture': 'image',
'link': 'link',
'links': 'link',
'text': 'text',
'content': 'text'
}
for token in tokens:
if token in target_map:
return target_map[token]
return 'unknown'
def _identify_limit(self, tokens: List[str]) -> int:
for token in tokens:
if token.isdigit():
return int(token)
return 0
def _identify_filters(self, tokens: List[str]) -> Dict[str, str]:
filters = {}
if 'external' in tokens:
filters['link_type'] = 'external'
elif 'internal' in tokens:
filters['link_type'] = 'internal'
if 'png' in tokens:
filters['file_type'] = 'png'
elif 'jpg' in tokens or 'jpeg' in tokens:
filters['file_type'] = 'jpg'
return filters
class ResponseFormatter:
"""Format scraped data based on user preferences"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def format_data(self, data: List[Dict], query_info: Dict) -> str:
try:
if not data:
return "No data found for the specified query."
# Apply filters
filtered_data = self._apply_filters(data, query_info.get('filters', {}))
# Apply limit
if query_info.get('limit', 0) > 0:
filtered_data = filtered_data[:query_info['limit']]
if query_info['output'] == "JSON":
return json.dumps({
"metadata": {
"query": query_info,
"timestamp": datetime.now().isoformat(),
"results_count": len(filtered_data)
},
"results": filtered_data
}, indent=2)
return self._format_human_readable(filtered_data, query_info['target'])
except Exception as e:
self.logger.error(f"Formatting error: {str(e)}")
return f"Error formatting results: {str(e)}"
def _apply_filters(self, data: List[Dict], filters: Dict) -> List[Dict]:
filtered_data = data
if 'link_type' in filters:
filtered_data = [item for item in filtered_data
if item.get('type', '') == filters['link_type']]
if 'file_type' in filters:
filtered_data = [item for item in filtered_data
if item.get('file_type', '').lower() == filters['file_type']]
return filtered_data
def _format_human_readable(self, data: List[Dict], target: str) -> str:
formats = {
'image': self._format_images,
'link': self._format_links,
'text': self._format_texts
}
return formats.get(target, lambda x: "Unknown data type")(data)
def _format_images(self, images: List[Dict]) -> str:
return "\n\n".join(
f"Image {idx+1}:\n"
f"Source: {img['src']}\n"
f"Alt Text: {img['alt']}\n"
f"Dimensions: {img['dimensions']}\n"
f"Type: {img['file_type']}"
for idx, img in enumerate(images)
)
def _format_links(self, links: List[Dict]) -> str:
return "\n\n".join(
f"Link {idx+1}:\n"
f"URL: {link['href']}\n"
f"Text: {link['text']}\n"
f"Type: {link['type']}\n"
f"Contains Image: {'Yes' if link['has_image'] else 'No'}"
for idx, link in enumerate(links)
)
def _format_texts(self, texts: List[Dict]) -> str:
return "\n\n".join(
f"Text Block {idx+1} ({text['source'].upper()}):\n"
f"{text['content']}"
for idx, text in enumerate(texts)
)
class Scraper:
"""Core scraping functionality with improved error handling"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
async def fetch_page(self, url: str) -> Optional[str]:
"""Fetch page content with retry mechanism"""
for attempt in range(Config.MAX_RETRIES):
try:
response = self.session.get(url, timeout=Config.TIMEOUT)
response.raise_for_status()
return response.text
except Exception as e:
self.logger.error(f"Attempt {attempt + 1} failed for {url}: {str(e)}")
if attempt == Config.MAX_RETRIES - 1:
return None
async def take_screenshot(self, url: str) -> Optional[bytes]:
"""Take a screenshot of a webpage with improved error handling."""
driver = None
try:
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(options=options)
driver.get(url)
# Wait for page load
time.sleep(2)
# Take screenshot
screenshot = driver.get_screenshot_as_png()
# Process image
img = Image.open(io.BytesIO(screenshot))
img = img.convert('RGB') # Convert to RGB to ensure compatibility
# Save to bytes
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='PNG', optimize=True)
return img_byte_arr.getvalue()
except Exception as e:
logging.error(f"Screenshot error for {url}: {str(e)}")
return None
finally:
if driver:
driver.quit()
class SmartWebScraper:
"""Smart web scraping with natural language processing capabilities"""
def __init__(self):
self.query_analyzer = QueryAnalyzer()
self.data_extractor = DataExtractor()
self.response_formatter = ResponseFormatter()
self.logger = logging.getLogger(__name__)
self.scraped_data = {}
def chat_based_scrape(self, instruction: str, url: str, output_format: str = "Formatted Text") -> str:
"""Process natural language instructions for web scraping"""
try:
if not instruction or not url:
return "Please provide both instruction and URL."
# Process the URL and instruction
raw_data = self.process_url(url, instruction) # Call the full scraping function
if isinstance(raw_data, str): # Check if the response is an error message
return raw_data # Return the error message directly
query_info = self.query_analyzer.parse_query(instruction)
query_info['output'] = output_format
if output_format == "JSON":
return json.dumps({
"status": "success",
"request": {
"url": url,
"instruction": instruction,
"timestamp": datetime.now().isoformat()
},
"data": raw_data,
"metadata": {
"source": url,
"elements_found": len(raw_data),
"content_type": type(raw_data).__name__
}
}, indent=2)
return self.response_formatter.format_data(raw_data, query_info)
except Exception as e:
error_msg = f"Error processing chat-based scrape: {str(e)}"
self.logger.error(error_msg)
return error_msg
def process_url(self, url: str, query: str) -> str:
"""Process URL based on query"""
try:
# Validate URL
if not self._validate_url(url):
return "Please provide a valid URL (including http:// or https://)."
# Fetch page
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Set page content and store in scraped_data
self.data_extractor.set_page(response.text, url)
self.logger.info(f"Scraping data from URL: {url}") # Log the URL being scraped
images = self.data_extractor.extract_images()
links = self.data_extractor.extract_links()
texts = self.data_extractor.extract_text()
self.scraped_data[url] = {
'images': images,
'links': links,
'texts': texts
'links': links,
'texts': texts
'images': self.data_extractor.extract_images(),
'links': self.data_extractor.extract_links(),
'texts': self.data_extractor.extract_text()
}
# Analyze query and extract data
query_info = self.query_analyzer.parse_query(query)
data = self._get_data_for_target(query_info['target'], url)
html_content = get_latest_data(url) # Fetch the HTML content
full_scraped_data = {
'url': url,
'images': self.scraped_data[url]['images'],
'links': self.scraped_data[url]['links'],
'texts': self.scraped_data[url]['texts'],
'metadata': {
'content_length': len(html_content),
'timestamp': datetime.now().isoformat()
}
}
return self.response_formatter.format_data(full_scraped_data, query_info)
except requests.exceptions.RequestException as e:
error_msg = f"Error fetching the webpage: {str(e)}"
self.logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"An error occurred: {str(e)}"
self.logger.error(error_msg)
return error_msg
def _validate_url(self, url: str) -> bool:
"""Validate URL format"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception as e:
self.logger.error(f"URL validation error: {str(e)}")
return False
def _get_data_for_target(self, target: str, url: str) -> List[Dict]:
"""Get specific data based on target type"""
if url not in self.scraped_data:
self.logger.warning(f"No data found for URL: {url}")
return []
if target == 'image':
return self.scraped_data[url]['images']
elif target == 'link':
return self.scraped_data[url]['links']
elif target == 'text':
return self.scraped_data[url]['texts']
else:
self.logger.warning(f"Unknown target type: {target}")
return []
class QueryAnalyzer:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.stop_words = set(stopwords.words('english'))
self.lemmatizer = WordNetLemmatizer()
def parse_query(self, query: str) -> Dict[str, Union[str, int]]:
try:
tokens = word_tokenize(query.lower())
filtered_tokens = [
self.lemmatizer.lemmatize(token)
for token in tokens
if token.isalnum() and token not in self.stop_words
]
return {
'target': self._identify_target(filtered_tokens),
'limit': self._identify_limit(filtered_tokens),
'filters': self._identify_filters(filtered_tokens),
'output': 'JSON' if 'json' in query.lower() else 'Formatted Text'
}
except Exception as e:
self.logger.error(f"Error parsing query: {str(e)}")
return {'target': 'unknown', 'limit': 0, 'filters': {}}
def _identify_target(self, tokens: List[str]) -> str:
targets = {'image': 'image', 'link': 'link', 'text': 'text'}
for token in tokens:
if token in targets:
return targets[token]
return 'unknown'
def _identify_limit(self, tokens: List[str]) -> int:
for token in tokens:
if token.isdigit():
return int(token)
return 0
def _identify_filters(self, tokens: List[str]) -> Dict[str, str]:
filters = {}
if 'external' in tokens:
filters['link_type'] = 'external'
elif 'internal' in tokens:
filters['link_type'] = 'internal'
return filters
class ResponseFormatter:
def __init__(self):
self.logger = logging.getLogger(__name__)
def format_data(self, data: List[Dict], query_info: Dict) -> Union[str, dict]:
try:
if not data:
return {"status": "success", "data": [], "message": "No data found"} if query_info['output'] == "JSON" else "No data found"
response = {
"metadata": {
"target": query_info['target'],
"limit": query_info['limit'],
"filters": query_info['filters'],
"timestamp": datetime.now().isoformat()
},
"data": data[:query_info['limit']] if query_info['limit'] > 0 else data
}
return json.dumps(response, indent=2) if query_info['output'] == "JSON" else self._format_text(response)
except Exception as e:
error_msg = {"status": "error", "message": str(e)}
return json.dumps(error_msg, indent=2) if query_info['output'] == "JSON" else f"Error: {str(e)}"
def _format_text(self, response: dict) -> str:
return json.dumps(response, indent=2) # Fallback if text formatting fails
def sanitize_filename(filename):
"""Sanitizes a filename by removing invalid characters."""
return re.sub(r'[<>:"/\\|?*\n]+', '_', filename)
def validate_url(url):
"""Validate if the URL is properly formatted."""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
def get_latest_data(url):
"""Get the latest HTML content of a webpage."""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # Raise an exception for bad status codes
return response.text
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching latest data from {url}: {str(e)}")
return None
def take_screenshot(url):
"""Take a screenshot of a webpage."""
try:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(options=chrome_options)
driver.get(url)
screenshot = driver.get_screenshot_as_png()
driver.quit()
image = Image.open(io.BytesIO(screenshot))
max_size = (1024, 1024)
image.thumbnail(max_size, Image.LANCZOS)
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
return img_byte_arr.getvalue()
except Exception as e:
logging.error(f"Screenshot error for {url}: {str(e)}")
return None
def process_urls(url_input, bulk_toggle, action_radio, max_urls, crawl_depth):
"""Process URLs with crawl depth and change detection."""
try:
urls = re.split(r'[,\n]+', url_input.strip()) if bulk_toggle else [url_input]
urls = [url.strip() for url in urls if url.strip()]
urls = urls[:int(max_urls)]
# Validate URLs
invalid_urls = [url for url in urls if not validate_url(url)]
if invalid_urls:
return None, None, json.dumps({"error": f"Invalid URLs detected: {', '.join(invalid_urls)}"}, indent=2)
scraped_data = []
screenshots = []
changes_log = []
# Create temporary directory for screenshots
temp_dir = Path("temp_screenshots")
temp_dir.mkdir(exist_ok=True)
# Process each URL with progress tracking
total_urls = len(urls)
for idx, url in enumerate(urls):
if not url.startswith(('http://', 'https://')):
url = f'https://{url}'
sanitized_url = sanitize_filename(url)
# Take screenshot
if action_radio in ['Capture image', 'Both']:
screenshot = take_screenshot(url)
if screenshot:
screenshot_path = temp_dir / f"{sanitized_url}.png"
with open(screenshot_path, 'wb') as f:
f.write(screenshot)
screenshots.append((url, str(screenshot_path))) # Convert Path to string
logger.info(f"Screenshot saved: {screenshot_path}") # Log the saved screenshot path
# Scrape data
if action_radio in ['Scrape data', 'Both']:
html_content = get_latest_data(url)
if html_content:
scraped_data.append({
'url': url,
'content_length': len(html_content),
'timestamp': datetime.now().isoformat()
})
# Create a ZIP file for the screenshots
zip_file_path = temp_dir / "screenshots.zip"
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
for screenshot in screenshots:
zipf.write(screenshot[1], arcname=Path(screenshot[1]).name) # Use string for writing
# Return the results
return str(zip_file_path), screenshots, scraped_data # Return structured data for JSON output
except Exception as e:
logging.error(f"Error in process_urls: {str(e)}")
return None, None, json.dumps({"error": str(e)}, indent=2)
return demo
def create_interface():
"""Create the Gradio interface."""
scraper = SmartWebScraper()
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🌐 Enhanced Web Scraper with Change Detection and Chat
Monitor and capture changes in web content automatically. Use the chat interface to interact with scraped data.
"""
)
with gr.Tabs():
with gr.Tab("URL Scrape/Screenshot"):
url_input = gr.Textbox(
label="Enter URL(s)",
placeholder="Enter single URL or multiple URLs separated by commas"
)
with gr.Row():
bulk_toggle = gr.Checkbox(label="Bulk URLs", value=False)
action_radio = gr.Radio(
["Scrape data", "Capture image", "Both"],
label="Select Action",
value="Both"
)
with gr.Row():
max_urls = gr.Slider(
minimum=1,
maximum=20,
value=5,
step=1,
label="Max URLs to process"
)
crawl_depth = gr.Slider(
minimum=0,
maximum=3,
value=1,
step=1,
label="Crawl Depth (0 for no recursion)"
)
process_button = gr.Button("Process URLs", variant="primary")
with gr.Column():
# Add gallery for screenshot preview
gallery = gr.Gallery(
label="Screenshots Preview",
show_label=True,
elem_id="gallery",
columns=[3],
rows=[2],
height="auto",
object_fit="contain" # Add proper image scaling
)
# Download button and results
download_file = gr.File(label="Download Results (ZIP)")
scraped_data_output = gr.JSON(label="Results Summary")
process_button.click(
fn=process_urls,
inputs=[
url_input,
bulk_toggle,
action_radio,
max_urls,
crawl_depth
],
outputs=[
download_file,
gallery,
scraped_data_output
],
show_progress=True
)
with gr.Tab("Chat-Based Scrape"):
instruction = gr.Textbox(
label="Enter Instruction",
placeholder="e.g., 'Scrape all links' or 'Extract all images'"
)
chat_url_input = gr.Textbox(
label="Enter URL",
value="https://example.com",
placeholder="Enter the target URL"
)
output_format = gr.Radio(
["Formatted Text", "JSON"],
label="Output Format",
value="Formatted Text"
)
chat_output = gr.Textbox(label="Output")
chat_button = gr.Button("Execute Instruction", variant="primary")
chat_button.click (
fn=scraper.chat_based_scrape,
inputs=[instruction, chat_url_input, output_format],
outputs=chat_output
)
gr.Markdown(
"""
### Features
- Bulk URL processing
- Screenshot capture
- Content change detection
- Recursive crawling
- Chat-based instructions for interacting with scraped data
"""
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch(debug=True)
|