Spaces:
Sleeping
Sleeping
| import requests | |
| from bs4 import BeautifulSoup | |
| import os | |
| from urllib.parse import urljoin, urlparse | |
| import time | |
| import zipfile | |
| import tempfile | |
| import shutil | |
| import gradio as gr | |
| def extract_detail_page_links(url, headers): | |
| """ | |
| Extract all detail page links from the main listing page. | |
| Args: | |
| url: Main page URL | |
| headers: Request headers | |
| Returns: | |
| list of detail page URLs | |
| """ | |
| response = requests.get(url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| detail_links = [] | |
| for link in soup.find_all('a', href=True): | |
| href = link['href'] | |
| # Look for detail page patterns (adjust pattern as needed) | |
| if 'Details.aspx' in href or 'PUB_ID=' in href: | |
| full_url = urljoin(url, href) | |
| if full_url not in detail_links: | |
| detail_links.append(full_url) | |
| return detail_links | |
| def extract_pdf_links_from_page(url, headers): | |
| """ | |
| Extract PDF links from a single page. | |
| Args: | |
| url: Page URL to scrape | |
| headers: Request headers | |
| Returns: | |
| list of PDF URLs | |
| """ | |
| try: | |
| response = requests.get(url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| pdf_links = [] | |
| for link in soup.find_all('a', href=True): | |
| href = link['href'] | |
| if '.pdf' in href.lower(): | |
| full_url = urljoin(url, href) | |
| if full_url not in pdf_links: | |
| pdf_links.append(full_url) | |
| return pdf_links | |
| except Exception as e: | |
| print(f"Error extracting PDFs from {url}: {str(e)}") | |
| return [] | |
| def download_pdfs_from_page(url, progress=gr.Progress()): | |
| """ | |
| Download all PDFs from a webpage by navigating through detail pages. | |
| Args: | |
| url: The main webpage URL to scrape | |
| progress: Gradio progress tracker | |
| Returns: | |
| tuple of (zip_file_path, summary_message) | |
| """ | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| } | |
| try: | |
| # Step 1: Extract detail page links from main page | |
| progress(0, desc="Fetching main page and extracting detail links...") | |
| detail_page_links = extract_detail_page_links(url, headers) | |
| if len(detail_page_links) == 0: | |
| return None, "β No detail page links found on the main page." | |
| progress(0.1, desc=f"Found {len(detail_page_links)} detail pages to process") | |
| # Step 2: Visit each detail page and collect PDF links | |
| all_pdf_links = [] | |
| for idx, detail_url in enumerate(detail_page_links, 1): | |
| progress(0.1 + (0.3 * idx / len(detail_page_links)), | |
| desc=f"[{idx}/{len(detail_page_links)}] Scanning detail page...") | |
| pdf_links = extract_pdf_links_from_page(detail_url, headers) | |
| all_pdf_links.extend(pdf_links) | |
| # Be polite - small delay between page requests | |
| time.sleep(0.5) | |
| # Remove duplicates | |
| all_pdf_links = list(set(all_pdf_links)) | |
| if len(all_pdf_links) == 0: | |
| return None, f"β No PDF links found across {len(detail_page_links)} detail pages." | |
| progress(0.4, desc=f"Found {len(all_pdf_links)} unique PDFs to download") | |
| # Step 3: Create temporary directory for downloads | |
| temp_dir = tempfile.mkdtemp() | |
| # Step 4: Download each PDF | |
| successful = 0 | |
| failed = 0 | |
| failed_urls = [] | |
| for idx, pdf_url in enumerate(all_pdf_links, 1): | |
| try: | |
| parsed_url = urlparse(pdf_url) | |
| path_without_query = parsed_url.path | |
| filename = os.path.basename(path_without_query) | |
| # Handle empty filenames | |
| if not filename or filename == '': | |
| filename = f"document_{idx}.pdf" | |
| filepath = os.path.join(temp_dir, filename) | |
| # Skip if file already exists | |
| if os.path.exists(filepath): | |
| progress(0.4 + (0.5 * idx / len(all_pdf_links)), | |
| desc=f"[{idx}/{len(all_pdf_links)}] Skipping: {filename}") | |
| successful += 1 | |
| continue | |
| progress(0.4 + (0.5 * idx / len(all_pdf_links)), | |
| desc=f"[{idx}/{len(all_pdf_links)}] Downloading: {filename}") | |
| # Download PDF | |
| pdf_response = requests.get(pdf_url, headers=headers, timeout=60) | |
| pdf_response.raise_for_status() | |
| # Verify it's actually a PDF | |
| if pdf_response.headers.get('content-type', '').lower() not in ['application/pdf', 'application/octet-stream']: | |
| failed += 1 | |
| failed_urls.append(f"{filename}: Not a valid PDF file") | |
| continue | |
| # Save PDF | |
| with open(filepath, 'wb') as f: | |
| f.write(pdf_response.content) | |
| successful += 1 | |
| time.sleep(1) # Be polite | |
| except Exception as e: | |
| failed += 1 | |
| failed_urls.append(f"{filename}: {str(e)}") | |
| continue | |
| # Step 5: Generate summary | |
| summary = f""" | |
| β **Download Complete!** | |
| π **Summary:** | |
| - Detail pages scanned: {len(detail_page_links)} | |
| - Total PDFs found: {len(all_pdf_links)} | |
| - Successfully downloaded: {successful} | |
| - Failed: {failed} | |
| """ | |
| if failed > 0: | |
| summary += f"\n\nβ οΈ **Failed Downloads:**\n" | |
| for fail in failed_urls[:10]: | |
| summary += f"- {fail}\n" | |
| if len(failed_urls) > 10: | |
| summary += f"- ... and {len(failed_urls) - 10} more\n" | |
| # Step 6: Create zip file | |
| progress(0.9, desc="Creating zip file...") | |
| zip_path = os.path.join(tempfile.gettempdir(), f"pdfs_{int(time.time())}.zip") | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for root, dirs, files in os.walk(temp_dir): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| zipf.write(file_path, arcname=file) | |
| # Clean up | |
| shutil.rmtree(temp_dir) | |
| progress(1.0, desc="Complete!") | |
| return zip_path, summary | |
| except requests.exceptions.RequestException as e: | |
| return None, f"β Error fetching webpage: {str(e)}" | |
| except Exception as e: | |
| return None, f"β Unexpected error: {str(e)}" | |
| # Create Gradio interface | |
| def create_interface(): | |
| with gr.Blocks(title="PDF Downloader", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| """ | |
| # π₯ Two-Level PDF Downloader | |
| Download all PDFs from webpages with intermediate detail pages! | |
| **Instructions:** | |
| 1. Enter the URL of the main listing page | |
| 2. Click "Download PDFs" | |
| 3. The tool will navigate through all detail pages | |
| 4. Download your ZIP file with all PDFs | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| url_input = gr.Textbox( | |
| label="Main Page URL", | |
| placeholder="https://armypubs.army.mil/ProductMaps/PubForm/AR.aspx", | |
| lines=1 | |
| ) | |
| download_btn = gr.Button("π₯ Download PDFs", variant="primary", size="lg") | |
| with gr.Row(): | |
| with gr.Column(): | |
| output_file = gr.File(label="Download ZIP") | |
| summary_output = gr.Markdown(label="Summary") | |
| download_btn.click( | |
| fn=download_pdfs_from_page, | |
| inputs=[url_input], | |
| outputs=[output_file, summary_output] | |
| ) | |
| gr.Markdown( | |
| """ | |
| --- | |
| ### π‘ Features: | |
| - **Two-level navigation**: Scans main page β visits detail pages β downloads PDFs | |
| - **Duplicate removal**: Ensures each PDF is downloaded only once | |
| - **Polite scraping**: Includes delays between requests | |
| - **Error handling**: Continues even if some downloads fail | |
| - **Progress tracking**: Real-time updates on scanning and downloading | |
| """ | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch(share=True) |