Spaces:
Running
Running
import gradio as gr | |
import requests | |
from bs4 import BeautifulSoup | |
import re | |
from urllib.parse import urljoin, urlparse | |
import asyncio | |
import aiohttp | |
from collections import defaultdict | |
import unicodedata | |
import logging | |
import ssl | |
import brotli # Add this import | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class WebsiteCrawler: | |
def __init__(self, max_depth=3, max_pages=50): | |
self.max_depth = max_depth | |
self.max_pages = max_pages | |
self.visited_urls = set() | |
self.url_metadata = defaultdict(dict) | |
self.homepage_metadata = None | |
self.headers = { | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Accept-Encoding": "gzip, deflate, br", | |
"DNT": "1", | |
"Connection": "keep-alive", | |
"Upgrade-Insecure-Requests": "1", | |
} | |
self.session = None | |
async def get_session(self): | |
if self.session is None: | |
ssl_context = ssl.create_default_context() | |
ssl_context.check_hostname = False | |
ssl_context.verify_mode = ssl.CERT_NONE | |
# Configure client with brotli support | |
connector = aiohttp.TCPConnector(ssl=ssl_context) | |
self.session = aiohttp.ClientSession( | |
connector=connector, timeout=aiohttp.ClientTimeout(total=30) | |
) | |
return self.session | |
async def decode_response(self, response): | |
"""Handle various content encodings including brotli""" | |
content_encoding = response.headers.get("Content-Encoding", "").lower() | |
content = await response.read() | |
if content_encoding == "br": | |
try: | |
decoded = brotli.decompress(content) | |
return decoded.decode("utf-8", errors="ignore") | |
except Exception as e: | |
logger.error(f"Error decoding brotli content: {str(e)}") | |
return content.decode("utf-8", errors="ignore") | |
elif content_encoding == "gzip": | |
import gzip | |
try: | |
decoded = gzip.decompress(content) | |
return decoded.decode("utf-8", errors="ignore") | |
except Exception as e: | |
logger.error(f"Error decoding gzip content: {str(e)}") | |
return content.decode("utf-8", errors="ignore") | |
else: | |
return content.decode("utf-8", errors="ignore") | |
def clean_text(self, text, is_title=False): | |
"""Clean and normalize text""" | |
if not text: | |
return "" | |
# Normalize unicode characters | |
text = unicodedata.normalize("NFKD", text) | |
text = re.sub(r"[^\x00-\x7F]+", "", text) | |
if is_title: | |
# Remove common suffixes and fragments for titles | |
text = re.sub(r"\s*[\|\-#:•].*", "", text) | |
text = re.sub(r"^\s*Welcome to\s+", "", text) | |
text = text.replace("docusaurus_skipToContent_fallback", "") | |
return " ".join(text.split()).strip() | |
async def process_homepage(self, url): | |
"""Specifically process the homepage to extract key metadata""" | |
try: | |
session = await self.get_session() | |
async with session.get( | |
url, headers=self.headers, allow_redirects=True | |
) as response: | |
if response.status != 200: | |
raise Exception( | |
f"Failed to fetch homepage: status {response.status}" | |
) | |
text = await self.decode_response(response) | |
soup = BeautifulSoup(text, "html.parser") | |
# Extract site name | |
site_name = None | |
site_meta = soup.find("meta", property="og:site_name") | |
if site_meta and site_meta.get("content"): | |
site_name = site_meta["content"] | |
if not site_name: | |
title_tag = soup.find("title") | |
if title_tag: | |
site_name = title_tag.text.split("|")[0].strip() | |
if not site_name: | |
site_name = urlparse(url).netloc.split(".")[0].capitalize() | |
# Get homepage description | |
description = None | |
meta_desc = soup.find("meta", {"name": "description"}) | |
if meta_desc and meta_desc.get("content"): | |
description = meta_desc["content"] | |
if not description: | |
og_desc = soup.find("meta", property="og:description") | |
if og_desc and og_desc.get("content"): | |
description = og_desc["content"] | |
if not description: | |
first_p = soup.find("p") | |
if first_p: | |
description = first_p.text | |
self.homepage_metadata = { | |
"site_name": self.clean_text(site_name, is_title=True), | |
"description": ( | |
self.clean_text(description) if description else None | |
), | |
} | |
except Exception as e: | |
logger.error(f"Error processing homepage {url}: {str(e)}") | |
self.homepage_metadata = { | |
"site_name": urlparse(url).netloc.split(".")[0].capitalize(), | |
"description": None, | |
} | |
async def crawl_website(self, start_url): | |
"""Crawl website starting from the given URL""" | |
try: | |
# First process the homepage | |
logger.info(f"Processing homepage: {start_url}") | |
await self.process_homepage(start_url) | |
base_domain = urlparse(start_url).netloc | |
queue = [(start_url, 0)] | |
seen = {start_url} | |
while queue and len(self.visited_urls) < self.max_pages: | |
current_url, depth = queue.pop(0) | |
if depth > self.max_depth: | |
continue | |
logger.info(f"Crawling page: {current_url} (depth: {depth})") | |
links = await self.crawl_page(current_url, depth, base_domain) | |
logger.info(f"Found {len(links)} links on {current_url}") | |
for link in links: | |
if link not in seen and urlparse(link).netloc == base_domain: | |
seen.add(link) | |
queue.append((link, depth + 1)) | |
logger.info(f"Crawl completed. Visited {len(self.visited_urls)} pages") | |
except Exception as e: | |
logger.error(f"Error during crawl: {str(e)}") | |
raise | |
finally: | |
await self.cleanup() | |
def generate_llms_txt(self): | |
"""Generate llms.txt content""" | |
if not self.url_metadata: | |
return "No content was found to generate llms.txt" | |
# Sort URLs by importance and remove duplicates | |
sorted_urls = [] | |
seen_titles = set() | |
for url, metadata in sorted( | |
self.url_metadata.items(), | |
key=lambda x: (x[1]["importance"], x[0]), | |
reverse=True, | |
): | |
if metadata["title"] not in seen_titles: | |
sorted_urls.append((url, metadata)) | |
seen_titles.add(metadata["title"]) | |
if not sorted_urls: | |
return "No valid content was found" | |
# Generate content | |
content = [] | |
# Use homepage metadata for main title and description | |
main_title = self.homepage_metadata.get("site_name", "Welcome") | |
homepage_description = self.homepage_metadata.get("description") | |
content.append(f"# {main_title}") | |
if homepage_description: | |
content.append(f"\n> {homepage_description}") | |
else: | |
# Fallback to first good description from content | |
for _, metadata in sorted_urls: | |
desc = self.clean_description(metadata["description"]) | |
if desc and len(desc) > 20 and "null" not in desc.lower(): | |
content.append(f"\n> {desc}") | |
break | |
# Group by category | |
categories = defaultdict(list) | |
for url, metadata in sorted_urls: | |
if metadata["title"] and url: | |
categories[metadata["category"]].append((url, metadata)) | |
# Add sections | |
for category in ["Docs", "API", "Guides", "Examples", "Blog", "Optional"]: | |
if category in categories: | |
content.append(f"\n## {category}") | |
# Add links without extra newlines | |
links = [] | |
for url, metadata in categories[category]: | |
title = metadata["title"].strip() | |
desc = self.clean_description(metadata["description"]) | |
if desc: | |
links.append(f"- [{title}]({url}): {desc}") | |
else: | |
links.append(f"- [{title}]({url})") | |
content.append("\n".join(links)) | |
return "\n".join(content) | |
# Process URL function (outside the class) | |
async def process_url(url, max_depth, max_pages): | |
"""Process URL and generate llms.txt""" | |
try: | |
# Add https:// if not present | |
if not url.startswith(("http://", "https://")): | |
url = "https://" + url | |
# Validate URL | |
result = urlparse(url) | |
if not all([result.scheme, result.netloc]): | |
return "", "Invalid URL format. Please enter a valid URL." | |
logger.info(f"Starting crawl of {url}") | |
# Process website | |
crawler = WebsiteCrawler(max_depth=int(max_depth), max_pages=int(max_pages)) | |
await crawler.crawl_website(url) | |
logger.info("Generating llms.txt content") | |
content = crawler.generate_llms_txt() | |
if not content or content.strip() == "": | |
return "", "No content was generated. Check the logs for details." | |
return content, f"Successfully crawled {len(crawler.visited_urls)} pages." | |
except Exception as e: | |
logger.error(f"Error processing URL {url}: {str(e)}") | |
return "", f"Error: {str(e)}" | |
# Create Gradio interface | |
theme = gr.themes.Soft(primary_hue="blue", font="Open Sans") | |
with gr.Blocks( | |
theme=theme, | |
css=""" | |
@import url('https://fonts.googleapis.com/css2?family=Open+Sans:wght@400;600&display=swap'); | |
.gradio-container { | |
font-family: 'Open Sans', sans-serif !important; | |
} | |
.gr-button { | |
font-family: 'Open Sans', sans-serif !important; | |
font-weight: 600 !important; | |
} | |
.primary-btn { | |
background-color: #2436d4 !important; | |
color: white !important; | |
} | |
.primary-btn:hover { | |
background-color: #1c2aa8 !important; | |
} | |
[data-testid="textbox"] { | |
font-family: 'Open Sans', sans-serif !important; | |
} | |
.gr-padded { | |
font-family: 'Open Sans', sans-serif !important; | |
} | |
.gr-input { | |
font-family: 'Open Sans', sans-serif !important; | |
} | |
.gr-label { | |
font-family: 'Open Sans', sans-serif !important; | |
} | |
""", | |
) as iface: | |
gr.Markdown("# llms.txt Generator") | |
gr.Markdown("Generate an llms.txt file from a website following the specification.") | |
with gr.Row(): | |
url_input = gr.Textbox( | |
label="Website URL", | |
placeholder="Enter the website URL (e.g., example.com)", | |
info="The URL will be automatically prefixed with https:// if not provided", | |
) | |
with gr.Row(): | |
with gr.Column(): | |
depth_input = gr.Slider( | |
minimum=1, maximum=5, value=3, step=1, label="Maximum Crawl Depth" | |
) | |
with gr.Column(): | |
pages_input = gr.Slider( | |
minimum=10, maximum=100, value=50, step=10, label="Maximum Pages" | |
) | |
generate_btn = gr.Button("Generate llms.txt", variant="primary") | |
output = gr.Textbox( | |
label="Generated llms.txt Content", | |
lines=20, | |
show_copy_button=True, | |
container=True, | |
) | |
status = gr.Textbox(label="Status") | |
generate_btn.click( | |
fn=lambda url, depth, pages: asyncio.run(process_url(url, depth, pages)), | |
inputs=[url_input, depth_input, pages_input], | |
outputs=[output, status], | |
) | |
if __name__ == "__main__": | |
iface.launch() | |