| import asyncio
|
| from playwright.async_api import async_playwright, Playwright, TimeoutError as PlaywrightTimeoutError
|
| from bs4 import BeautifulSoup, NavigableString, Tag
|
| import re
|
| import os
|
| from urllib.parse import urljoin
|
| import sqlite3
|
| import datetime
|
| import time
|
| import random
|
| import xml.etree.ElementTree as ET
|
|
|
|
|
| DATABASE_FILE = "scraped.db"
|
| MARKDOWN_OUTPUT_DIR = "scraped_md"
|
| URLS_FILE = "urls.txt"
|
| DELAY_MIN_SECONDS = 0.5
|
| DELAY_MAX_SECONDS = 1.0
|
| NAVIGATION_TIMEOUT_SECONDS = 60
|
|
|
|
|
| def init_db():
|
| """Initializes the SQLite database and creates the necessary table."""
|
| os.makedirs(MARKDOWN_OUTPUT_DIR, exist_ok=True)
|
| conn = sqlite3.connect(DATABASE_FILE)
|
| cursor = conn.cursor()
|
| cursor.execute('''
|
| CREATE TABLE IF NOT EXISTS scraped_data (
|
| id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| url TEXT NOT NULL,
|
| title TEXT,
|
| full_markdown_content TEXT,
|
| status_code INTEGER,
|
| error_message TEXT,
|
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
|
| )
|
| ''')
|
| conn.commit()
|
| conn.close()
|
| print(f"[*] Database '{DATABASE_FILE}' initialized and '{MARKDOWN_OUTPUT_DIR}' directory ensured.")
|
|
|
| def insert_scraped_data(url, title, markdown_content, status_code, error_message=None):
|
| """Inserts scraped data into the database."""
|
| conn = sqlite3.connect(DATABASE_FILE)
|
| cursor = conn.cursor()
|
| cursor.execute('''
|
| INSERT INTO scraped_data (url, title, full_markdown_content, status_code, error_message)
|
| VALUES (?, ?, ?, ?, ?)
|
| ''', (url, title, markdown_content, status_code, error_message))
|
| conn.commit()
|
| conn.close()
|
|
|
|
|
| async def process_page_content(page, url):
|
| """
|
| Extracts text and inline Markdown links from <p> tags on a given page.
|
| """
|
| try:
|
| html_content = await page.content()
|
| soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
| page_title = soup.find('title').text if soup.find('title') else 'Untitled_Page_No_JS'
|
|
|
| markdown_paragraphs = []
|
|
|
| for p_tag in soup.find_all('p'):
|
| paragraph_markdown = []
|
| for content in p_tag.contents:
|
| if isinstance(content, NavigableString):
|
| paragraph_markdown.append(str(content).strip())
|
| elif isinstance(content, Tag) and content.name == 'a':
|
| link_text = content.get_text(strip=True)
|
| link_href = content.get('href')
|
|
|
| if link_href:
|
| if not link_href.startswith(('http://', 'https://', 'mailto:', 'tel:', '#')):
|
| link_href = urljoin(url, link_href)
|
| paragraph_markdown.append(f"[{link_text}]({link_href})")
|
| else:
|
| paragraph_markdown.append(link_text)
|
| else:
|
| paragraph_markdown.append(content.get_text(strip=True))
|
|
|
| if any(paragraph_markdown):
|
| joined_paragraph = " ".join(paragraph_markdown).strip()
|
| joined_paragraph = re.sub(r'\s+', ' ', joined_paragraph)
|
| markdown_paragraphs.append(joined_paragraph)
|
|
|
| full_markdown_content = f"# {page_title}\n\n"
|
| full_markdown_content += "\n\n".join(markdown_paragraphs)
|
|
|
| return {
|
| "title": page_title,
|
| "markdown_content": full_markdown_content,
|
| "error_message": None
|
| }
|
|
|
| except Exception as e:
|
| return {
|
| "title": "Error Processing Content",
|
| "markdown_content": "",
|
| "error_message": f"Error during content parsing: {e}"
|
| }
|
|
|
|
|
| def load_urls_from_sitemap(sitemap_path):
|
| """Parses an XML sitemap file and returns a list of URLs."""
|
| urls = []
|
| try:
|
| tree = ET.parse(sitemap_path)
|
| root = tree.getroot()
|
|
|
| namespace = {'sitemap': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
|
| for url_element in root.findall('sitemap:url', namespace):
|
| loc_element = url_element.find('sitemap:loc', namespace)
|
| if loc_element is not None:
|
| urls.append(loc_element.text)
|
| print(f"[*] Loaded {len(urls)} URLs from sitemap: '{sitemap_path}'")
|
| return urls
|
| except FileNotFoundError:
|
| print(f"ERROR: Sitemap file '{sitemap_path}' not found.")
|
| return []
|
| except ET.ParseError as e:
|
| print(f"ERROR: Failed to parse sitemap '{sitemap_path}': {e}")
|
| return []
|
| except Exception as e:
|
| print(f"An unexpected error occurred while loading sitemap: {e}")
|
| return []
|
|
|
| async def main():
|
| init_db()
|
|
|
| urls_to_scrape = []
|
|
|
|
|
| print("\n--- URL Source Selection ---")
|
| print("1. Load URLs from 'urls.txt' (one URL per line)")
|
| print("2. Load URLs from an XML sitemap file")
|
| choice = input("Enter your choice (1 or 2): ").strip()
|
|
|
| if choice == '1':
|
| try:
|
| with open(URLS_FILE, 'r', encoding='utf-8') as f:
|
| urls_to_scrape = [line.strip() for line in f if line.strip()]
|
| if not urls_to_scrape:
|
| print(f"WARNING: '{URLS_FILE}' is empty. No URLs to scrape.")
|
| return
|
| print(f"[*] Using URLs from '{URLS_FILE}'.")
|
| except FileNotFoundError:
|
| print(f"ERROR: '{URLS_FILE}' not found. Please create the file with URLs, one per line.")
|
| return
|
| elif choice == '2':
|
| sitemap_path = input("Enter the path to the XML sitemap file: ").strip()
|
| if not sitemap_path:
|
| print("Sitemap path cannot be empty. Exiting.")
|
| return
|
| urls_to_scrape = load_urls_from_sitemap(sitemap_path)
|
| if not urls_to_scrape:
|
| print("No URLs loaded from sitemap. Exiting.")
|
| return
|
| else:
|
| print("Invalid choice. Please enter 1 or 2.")
|
| return
|
|
|
| total_urls = len(urls_to_scrape)
|
| if total_urls == 0:
|
| print("No URLs available for scraping. Exiting.")
|
| return
|
|
|
| start_total_time = time.time()
|
|
|
| print(f"--- Starting automated scraping of {total_urls} URLs ---")
|
|
|
| print(f"[*] Browsers will launch non-headless (visible), process the page, and close automatically for each URL.")
|
| print(f"[*] No manual input required after starting. It will proceed to the next URL after a short delay.")
|
| print(f"[*] Delay between requests: {DELAY_MIN_SECONDS:.1f} - {DELAY_MAX_SECONDS:.1f} seconds.")
|
| print(f"[*] Navigation timeout set to {NAVIGATION_TIMEOUT_SECONDS} seconds per page.")
|
|
|
|
|
|
|
| for i, url in enumerate(urls_to_scrape):
|
| current_index = i + 1
|
|
|
| elapsed_time = time.time() - start_total_time
|
| avg_time_per_url = elapsed_time / current_index if current_index > 0 else 0
|
| remaining_urls = total_urls - current_index
|
| eta_seconds = remaining_urls * avg_time_per_url
|
| eta_display = str(datetime.timedelta(seconds=int(eta_seconds)))
|
|
|
| print(f"\n--- Progress: {current_index}/{total_urls} --- ETA: {eta_display} ---")
|
| print(f"[*] Attempting to navigate to: {url}")
|
|
|
| browser = None
|
| status_code = 0
|
| scraped_data_title = "Not Scraped"
|
| scraped_data_markdown = ""
|
| scraped_data_error = "Unknown error"
|
|
|
| try:
|
| async with async_playwright() as p:
|
| browser = await p.chromium.launch(headless=False)
|
| context = await browser.new_context(java_script_enabled=False)
|
|
|
|
|
| await context.set_extra_http_headers({
|
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
|
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
| "Accept-Language": "en,en-AU;q=0.9,sr;q=0.8,sr-RS;q=0.7,en-GB;q=0.6,en-US;q=0.5,hr;q=0.4",
|
| "Cache-Control": "max-age=0",
|
| "Sec-Ch-Ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
|
| "Sec-Ch-Ua-Mobile": "?0",
|
| "Sec-Ch-Ua-Platform": '"Windows"',
|
| "Sec-Fetch-Dest": "document",
|
| "Sec-Fetch-Mode": "navigate",
|
| "Sec-Fetch-Site": "same-origin",
|
| "Sec-Fetch-User": "?1",
|
| "Upgrade-Insecure-Requests": "1"
|
| })
|
|
|
| page = await context.new_page()
|
|
|
|
|
| try:
|
| response = await page.goto(url, wait_until="load", timeout=NAVIGATION_TIMEOUT_SECONDS * 1000)
|
| status_code = response.status if response else 0
|
| print(f"[*] Page loaded. HTTP Status: {status_code}")
|
|
|
| if 200 <= status_code < 300:
|
| scraped_content_result = await process_page_content(page, url)
|
| scraped_data_title = scraped_content_result["title"]
|
| scraped_data_markdown = scraped_content_result["markdown_content"]
|
| scraped_data_error = scraped_content_result["error_message"]
|
| print(f"[*] Content extraction attempted for: {url}")
|
| else:
|
| scraped_data_title = f"HTTP Error {status_code}"
|
| scraped_data_markdown = ""
|
| scraped_data_error = f"Navigation failed with status {status_code}"
|
| print(f"[*] WARNING: Non-2xx status code: {status_code}")
|
|
|
| except PlaywrightTimeoutError:
|
| status_code = 408
|
| scraped_data_title = "Navigation Timeout"
|
| scraped_data_markdown = ""
|
| scraped_data_error = f"Navigation timed out after {NAVIGATION_TIMEOUT_SECONDS} seconds."
|
| print(f"[*] ERROR: Navigation timed out for {url}")
|
| except Exception as nav_error:
|
| status_code = 0
|
| scraped_data_title = "Navigation Error"
|
| scraped_data_markdown = ""
|
| scraped_data_error = f"Error during navigation: {nav_error}"
|
| print(f"[*] ERROR during navigation for {url}: {nav_error}")
|
|
|
|
|
| await page.close()
|
| await context.close()
|
|
|
| except Exception as browser_launch_error:
|
|
|
| scraped_data_title = "Browser Launch Error"
|
| scraped_data_markdown = ""
|
| scraped_data_error = f"Browser or context launch failed: {browser_launch_error}"
|
| print(f"[*] CRITICAL ERROR (Browser/Context Launch) for {url}: {browser_launch_error}")
|
| finally:
|
| if browser:
|
| await browser.close()
|
|
|
|
|
| insert_scraped_data(
|
| url,
|
| scraped_data_title,
|
| scraped_data_markdown,
|
| status_code,
|
| scraped_data_error
|
| )
|
| print(f"[*] Data for {url} saved to '{DATABASE_FILE}'.")
|
|
|
|
|
| if scraped_data_markdown and not scraped_data_error:
|
| safe_filename = re.sub(r'[\\/:*?"<>| ]', '_', scraped_data_title)[:100]
|
| md_filename = os.path.join(MARKDOWN_OUTPUT_DIR, f"{safe_filename}.md")
|
| try:
|
| with open(md_filename, 'w', encoding='utf-8') as f:
|
| f.write(scraped_data_markdown)
|
| print(f"[*] Markdown saved to {md_filename}")
|
| except Exception as file_error:
|
| print(f"[*] ERROR: Failed to save MD file for {url}: {file_error}")
|
| elif scraped_data_error:
|
| print(f"[*] Skipping MD file creation for {url} due to an error.")
|
|
|
|
|
| if i < total_urls - 1:
|
| delay = random.uniform(DELAY_MIN_SECONDS, DELAY_MAX_SECONDS)
|
| print(f"[*] Waiting {delay:.2f} seconds before next URL...")
|
| await asyncio.sleep(delay)
|
|
|
| end_total_time = time.time()
|
| total_duration = str(datetime.timedelta(seconds=int(end_total_time - start_total_time)))
|
| print(f"\n=== Automated scraping process complete! ===")
|
| print(f"Total URLs processed: {total_urls}")
|
| print(f"Total duration: {total_duration}")
|
| print(f"Scraped data saved to '{DATABASE_FILE}' and markdown files in '{MARKDOWN_OUTPUT_DIR}/'.")
|
|
|
| if __name__ == "__main__":
|
| asyncio.run(main()) |