Spaces:
Running
Running
from __future__ import annotations | |
import os | |
os.environ['HF_HOME'] = '/tmp/hf_cache' | |
os.makedirs(os.environ['HF_HOME'], exist_ok=True) # Ensure the directory exists | |
import gradio as gr | |
import subprocess | |
import os | |
import re | |
import tempfile | |
import json | |
import csv | |
# Removed: from typing import Iterable # Added for Theme | |
from rag_scraper.scraper import Scraper | |
from rag_scraper.converter import Converter | |
from rag_scraper.link_extractor import LinkExtractor, LinkType | |
from rag_scraper.utils import URLUtils | |
# Removed: from gradio.themes.base import Base # Added for Theme | |
# Removed: from gradio.themes.utils import colors, fonts, sizes # Added for Theme | |
import markdown_pdf # Added for PDF conversion | |
# --- Custom Theme Definition --- (REMOVED Seafoam class and instance) | |
def is_github_repo(url_or_id): | |
"""Check if the input is a GitHub repository URL or ID.""" | |
if "github.com" in url_or_id: | |
return True | |
if re.match(r'^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+$', url_or_id): | |
return True | |
return False | |
def check_repomix_installed(): | |
"""Check if Repomix is installed.""" | |
try: | |
result = subprocess.run(["repomix", "--version"], | |
capture_output=True, text=True, check=False) | |
return result.returncode == 0 | |
except Exception: | |
return False | |
def run_repomix(repo_url_or_id, progress=gr.Progress(track_tqdm=True)): | |
"""Run Repomix on the GitHub repository and return the content.""" | |
progress(0, desc="Starting Repomix processing...") | |
try: | |
with tempfile.TemporaryDirectory() as temp_dir: | |
output_file_name = "repomix-output.md" | |
output_file_path = os.path.join(temp_dir, output_file_name) | |
if '/' in repo_url_or_id and not repo_url_or_id.startswith('http'): | |
repo_url = f"https://github.com/{repo_url_or_id}" | |
else: | |
repo_url = repo_url_or_id | |
progress(0.2, desc=f"Running Repomix on {repo_url}...") | |
cmd = [ | |
"repomix", | |
"--remote", repo_url, | |
"--output", output_file_path, | |
"--style", "markdown", | |
"--compress" | |
] | |
process = subprocess.run(cmd, capture_output=True, text=True, check=False, encoding='utf-8') | |
progress(0.8, desc="Repomix command executed.") | |
if process.returncode != 0: | |
error_details = f"Return Code: {process.returncode}\nStderr: {process.stderr}\nStdout: {process.stdout}" | |
return f"Error running Repomix:\n{error_details}", None | |
if os.path.exists(output_file_path): | |
with open(output_file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
progress(1, desc="Repomix output processed.") | |
return content, output_file_path | |
else: | |
error_details = f"Return Code: {process.returncode}\nStderr: {process.stderr}\nStdout: {process.stdout}" | |
return f"Error: Repomix did not generate an output file at '{output_file_path}'.\nRepomix Output:\n{error_details}", None | |
except Exception as e: | |
progress(1, desc="Error during Repomix processing.") | |
return f"Error processing GitHub repository: {str(e)}", None | |
def scrape_and_convert_website(url, depth, progress=gr.Progress(track_tqdm=True)): | |
"""Fetch HTML, extract links, convert to Markdown.""" | |
progress(0, desc=f"Starting web scrape for {url}...") | |
visited_urls = set() | |
all_markdown_content = "" | |
def recursive_scrape(current_url, current_depth, total_links_estimate=1, link_index=0): | |
if current_url in visited_urls or current_depth < 0: | |
return "" | |
visited_urls.add(current_url) | |
try: | |
progress_val = link_index / total_links_estimate if total_links_estimate > 0 else 0 | |
progress(progress_val, desc=f"Scraping: {current_url} (Depth: {depth - current_depth})") | |
html_content = Scraper.fetch_html(current_url) | |
except Exception as e: | |
return f"Error fetching {current_url}: {str(e)}\n" | |
markdown_content = f"## Extracted from: {current_url}\n\n" | |
markdown_content += Converter.html_to_markdown( | |
html=html_content, | |
base_url=current_url, | |
parser_features='html.parser', | |
ignore_links=True | |
) | |
page_content = markdown_content + "\n\n" | |
if current_depth > 0: | |
try: | |
links = LinkExtractor.scrape_url(current_url, link_type=LinkType.INTERNAL) | |
valid_links = [ | |
link for link in links | |
if URLUtils.is_internal(link, current_url) and link not in visited_urls | |
] | |
num_links = len(valid_links) | |
for i, link_url in enumerate(valid_links): | |
page_content += recursive_scrape(link_url, current_depth - 1, num_links, i) | |
except Exception as e: | |
page_content += f"Error extracting links from {current_url}: {str(e)}\n" | |
return page_content | |
all_markdown_content = recursive_scrape(url, depth) | |
progress(1, desc="Web scraping complete.") | |
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".md", encoding="utf-8") as tmp_file: | |
tmp_file.write(all_markdown_content) | |
return all_markdown_content, tmp_file.name | |
def convert_to_json(markdown_content, source_url_or_id): | |
data = {"source": source_url_or_id, "content": markdown_content} | |
return json.dumps(data, indent=2) | |
def convert_to_csv(markdown_content, source_url_or_id): | |
output = tempfile.NamedTemporaryFile(mode='w+', delete=False, newline='', suffix=".csv", encoding="utf-8") | |
writer = csv.writer(output) | |
writer.writerow(["source", "content"]) | |
writer.writerow([source_url_or_id, markdown_content]) | |
output.close() | |
return output.name | |
def save_output_to_file(content, output_format, source_url_or_id): | |
"""Saves content to a temporary file based on format and returns its path.""" | |
processed_content = content # Default for Markdown and Text | |
if output_format == "JSON": | |
suffix = ".json" | |
processed_content = convert_to_json(content, source_url_or_id) | |
elif output_format == "CSV": | |
# convert_to_csv returns a path directly | |
return convert_to_csv(content, source_url_or_id) | |
elif output_format == "Text": | |
suffix = ".txt" | |
elif output_format == "PDF": | |
suffix = ".pdf" | |
# PDF conversion happens differently, creates file directly | |
pdf_output_path = "" | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf_file: | |
pdf_output_path = tmp_pdf_file.name | |
md_pdf = markdown_pdf.MarkdownPdf(toc_level=2) | |
md_pdf.convert_from_string(content, pdf_output_path) | |
return pdf_output_path | |
except Exception as e: | |
print(f"PDF conversion failed: {e}. Saving as Markdown instead.") | |
suffix = ".pdf.md" | |
# No processed_content change needed, it's already markdown | |
else: # Default to Markdown | |
suffix = ".md" | |
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=suffix, encoding="utf-8") as tmp_file: | |
tmp_file.write(processed_content) | |
return tmp_file.name | |
def process_input_updated(url_or_id, source_type, depth, output_format_selection, progress=gr.Progress(track_tqdm=True)): | |
progress(0, desc="Initializing...") | |
raw_content = "" | |
error_message = "" | |
output_file_path = None | |
if source_type == "GitHub Repository": | |
if not check_repomix_installed(): | |
error_message = "Repomix is not installed or not accessible. Please ensure it's installed globally." | |
return error_message, None, None | |
raw_content, _ = run_repomix(url_or_id, progress=progress) | |
if "Error" in raw_content: | |
error_message = raw_content | |
raw_content = "" | |
elif source_type == "Webpage": | |
raw_content, _ = scrape_and_convert_website(url_or_id, depth, progress=progress) | |
if "Error" in raw_content: | |
error_message = raw_content | |
raw_content = "" | |
else: | |
error_message = "Invalid source type selected." | |
return error_message, None, None | |
if error_message: | |
return error_message, None, None | |
try: | |
progress(0.9, desc=f"Converting to {output_format_selection}...") | |
output_file_path = save_output_to_file(raw_content, output_format_selection, url_or_id) | |
preview_content = raw_content | |
if output_format_selection == "JSON": | |
preview_content = convert_to_json(raw_content, url_or_id) | |
elif output_format_selection == "CSV" and output_file_path: | |
try: | |
with open(output_file_path, 'r', encoding='utf-8') as f_csv: | |
csv_preview_lines = [next(f_csv) for _ in range(5)] | |
preview_content = "".join(csv_preview_lines) | |
if not preview_content: preview_content = "[CSV content is empty or very short]" | |
except StopIteration: | |
with open(output_file_path, 'r', encoding='utf-8') as f_csv: | |
preview_content = f_csv.read() | |
if not preview_content: preview_content = "[CSV content is empty]" | |
except Exception as e_csv_preview: | |
preview_content = f"[Error reading CSV for preview: {str(e_csv_preview)}]" | |
elif output_format_selection == "CSV" and not output_file_path: | |
preview_content = "[CSV file path not available for preview]" | |
elif output_format_selection == "PDF": | |
preview_content = f"[PDF generated. Download to view: {os.path.basename(output_file_path if output_file_path else 'file.pdf')}]" | |
if "Saving as Markdown instead" in (output_file_path or ""): | |
preview_content = raw_content + f"\n\n[Note: PDF conversion failed, showing Markdown. File saved as .pdf.md]" | |
progress(1, desc="Processing complete.") | |
return f"Successfully processed: {url_or_id}", preview_content, output_file_path | |
except Exception as e: | |
return f"Error during file conversion/saving: {str(e)}", raw_content, None | |
with gr.Blocks(title="RAG-Ready Content Scraper", theme="CultriX/gradio-theme") as iface: | |
gr.Markdown("# RAG-Ready Content Scraper") | |
gr.Markdown( | |
"Scrape webpage content or GitHub repositories to generate RAG-ready datasets." | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
url_input = gr.Textbox( | |
label="Enter URL or GitHub Repository ID", | |
placeholder="e.g., https://example.com OR username/repo" | |
) | |
source_type_input = gr.Radio( | |
choices=["Webpage", "GitHub Repository"], | |
value="Webpage", | |
label="Select Source Type" | |
) | |
depth_input = gr.Slider( | |
minimum=0, maximum=3, step=1, value=0, | |
label="Scraping Depth (for Webpages)", | |
info="0: Only main page. Ignored for GitHub repos." | |
) | |
output_format_input = gr.Dropdown( | |
choices=["Markdown", "JSON", "CSV", "Text", "PDF"], | |
value="Markdown", | |
label="Select Output Format" | |
) | |
submit_button = gr.Button("Process Content", variant="primary") | |
with gr.Column(scale=3): | |
status_output = gr.Textbox(label="Status", interactive=False) | |
preview_output = gr.Code(label="Preview Content", language="markdown", interactive=False) | |
file_download_output = gr.File(label="Download Processed File", interactive=False) | |
gr.Examples( | |
examples=[ | |
["https://gradio.app/docs/js", "Webpage", 1, "Markdown"], | |
["gradio-app/gradio", "GitHub Repository", 0, "Text"], | |
["https://en.wikipedia.org/wiki/Retrieval-augmented_generation", "Webpage", 0, "JSON"], | |
], | |
inputs=[url_input, source_type_input, depth_input, output_format_input], | |
outputs=[status_output, preview_output, file_download_output], | |
fn=process_input_updated, | |
cache_examples=False | |
) | |
with gr.Accordion("How it Works & More Info", open=False): | |
gr.Markdown( | |
""" | |
**Webpage Scraping:** | |
1. Enter a full URL (e.g., `https://example.com`). | |
2. Select "Webpage" as the source type. | |
3. Set the desired scraping depth. | |
4. Choose your output format. | |
**GitHub Repository Processing:** | |
1. Enter a GitHub repository URL or ID (e.g., `username/repo`). | |
2. Select "GitHub Repository". (Depth is ignored). | |
3. Choose your output format. Uses **RepoMix**. | |
**Output Formats:** Markdown, JSON, CSV, Text, PDF. | |
**Note:** PDF generation requires `markdown-pdf` library. | |
This app is designed for Docker/HuggingFace Spaces. | |
[View Source Code on HuggingFace Spaces](https://huggingface.co/spaces/CultriX/RAG-Scraper) | |
""" | |
) | |
submit_button.click( | |
fn=process_input_updated, | |
inputs=[url_input, source_type_input, depth_input, output_format_input], | |
outputs=[status_output, preview_output, file_download_output], | |
) | |
if __name__ == "__main__": | |
iface.launch() | |