create-llms-txt / app.py
cyberandy's picture
update
1de7c37
raw
history blame
12.5 kB
import gradio as gr
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin, urlparse
import asyncio
import aiohttp
from collections import defaultdict
import unicodedata
import logging
import ssl
import brotli # Add this import
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class WebsiteCrawler:
def __init__(self, max_depth=3, max_pages=50):
self.max_depth = max_depth
self.max_pages = max_pages
self.visited_urls = set()
self.url_metadata = defaultdict(dict)
self.homepage_metadata = None
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
self.session = None
async def get_session(self):
if self.session is None:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Configure client with brotli support
connector = aiohttp.TCPConnector(ssl=ssl_context)
self.session = aiohttp.ClientSession(
connector=connector, timeout=aiohttp.ClientTimeout(total=30)
)
return self.session
async def decode_response(self, response):
"""Handle various content encodings including brotli"""
content_encoding = response.headers.get("Content-Encoding", "").lower()
content = await response.read()
if content_encoding == "br":
try:
decoded = brotli.decompress(content)
return decoded.decode("utf-8", errors="ignore")
except Exception as e:
logger.error(f"Error decoding brotli content: {str(e)}")
return content.decode("utf-8", errors="ignore")
elif content_encoding == "gzip":
import gzip
try:
decoded = gzip.decompress(content)
return decoded.decode("utf-8", errors="ignore")
except Exception as e:
logger.error(f"Error decoding gzip content: {str(e)}")
return content.decode("utf-8", errors="ignore")
else:
return content.decode("utf-8", errors="ignore")
def clean_text(self, text, is_title=False):
"""Clean and normalize text"""
if not text:
return ""
# Normalize unicode characters
text = unicodedata.normalize("NFKD", text)
text = re.sub(r"[^\x00-\x7F]+", "", text)
if is_title:
# Remove common suffixes and fragments for titles
text = re.sub(r"\s*[\|\-#:•].*", "", text)
text = re.sub(r"^\s*Welcome to\s+", "", text)
text = text.replace("docusaurus_skipToContent_fallback", "")
return " ".join(text.split()).strip()
async def process_homepage(self, url):
"""Specifically process the homepage to extract key metadata"""
try:
session = await self.get_session()
async with session.get(
url, headers=self.headers, allow_redirects=True
) as response:
if response.status != 200:
raise Exception(
f"Failed to fetch homepage: status {response.status}"
)
text = await self.decode_response(response)
soup = BeautifulSoup(text, "html.parser")
# Extract site name
site_name = None
site_meta = soup.find("meta", property="og:site_name")
if site_meta and site_meta.get("content"):
site_name = site_meta["content"]
if not site_name:
title_tag = soup.find("title")
if title_tag:
site_name = title_tag.text.split("|")[0].strip()
if not site_name:
site_name = urlparse(url).netloc.split(".")[0].capitalize()
# Get homepage description
description = None
meta_desc = soup.find("meta", {"name": "description"})
if meta_desc and meta_desc.get("content"):
description = meta_desc["content"]
if not description:
og_desc = soup.find("meta", property="og:description")
if og_desc and og_desc.get("content"):
description = og_desc["content"]
if not description:
first_p = soup.find("p")
if first_p:
description = first_p.text
self.homepage_metadata = {
"site_name": self.clean_text(site_name, is_title=True),
"description": (
self.clean_text(description) if description else None
),
}
except Exception as e:
logger.error(f"Error processing homepage {url}: {str(e)}")
self.homepage_metadata = {
"site_name": urlparse(url).netloc.split(".")[0].capitalize(),
"description": None,
}
async def crawl_website(self, start_url):
"""Crawl website starting from the given URL"""
try:
# First process the homepage
logger.info(f"Processing homepage: {start_url}")
await self.process_homepage(start_url)
base_domain = urlparse(start_url).netloc
queue = [(start_url, 0)]
seen = {start_url}
while queue and len(self.visited_urls) < self.max_pages:
current_url, depth = queue.pop(0)
if depth > self.max_depth:
continue
logger.info(f"Crawling page: {current_url} (depth: {depth})")
links = await self.crawl_page(current_url, depth, base_domain)
logger.info(f"Found {len(links)} links on {current_url}")
for link in links:
if link not in seen and urlparse(link).netloc == base_domain:
seen.add(link)
queue.append((link, depth + 1))
logger.info(f"Crawl completed. Visited {len(self.visited_urls)} pages")
except Exception as e:
logger.error(f"Error during crawl: {str(e)}")
raise
finally:
await self.cleanup()
def generate_llms_txt(self):
"""Generate llms.txt content"""
if not self.url_metadata:
return "No content was found to generate llms.txt"
# Sort URLs by importance and remove duplicates
sorted_urls = []
seen_titles = set()
for url, metadata in sorted(
self.url_metadata.items(),
key=lambda x: (x[1]["importance"], x[0]),
reverse=True,
):
if metadata["title"] not in seen_titles:
sorted_urls.append((url, metadata))
seen_titles.add(metadata["title"])
if not sorted_urls:
return "No valid content was found"
# Generate content
content = []
# Use homepage metadata for main title and description
main_title = self.homepage_metadata.get("site_name", "Welcome")
homepage_description = self.homepage_metadata.get("description")
content.append(f"# {main_title}")
if homepage_description:
content.append(f"\n> {homepage_description}")
else:
# Fallback to first good description from content
for _, metadata in sorted_urls:
desc = self.clean_description(metadata["description"])
if desc and len(desc) > 20 and "null" not in desc.lower():
content.append(f"\n> {desc}")
break
# Group by category
categories = defaultdict(list)
for url, metadata in sorted_urls:
if metadata["title"] and url:
categories[metadata["category"]].append((url, metadata))
# Add sections
for category in ["Docs", "API", "Guides", "Examples", "Blog", "Optional"]:
if category in categories:
content.append(f"\n## {category}")
# Add links without extra newlines
links = []
for url, metadata in categories[category]:
title = metadata["title"].strip()
desc = self.clean_description(metadata["description"])
if desc:
links.append(f"- [{title}]({url}): {desc}")
else:
links.append(f"- [{title}]({url})")
content.append("\n".join(links))
return "\n".join(content)
# Process URL function (outside the class)
async def process_url(url, max_depth, max_pages):
"""Process URL and generate llms.txt"""
try:
# Add https:// if not present
if not url.startswith(("http://", "https://")):
url = "https://" + url
# Validate URL
result = urlparse(url)
if not all([result.scheme, result.netloc]):
return "", "Invalid URL format. Please enter a valid URL."
logger.info(f"Starting crawl of {url}")
# Process website
crawler = WebsiteCrawler(max_depth=int(max_depth), max_pages=int(max_pages))
await crawler.crawl_website(url)
logger.info("Generating llms.txt content")
content = crawler.generate_llms_txt()
if not content or content.strip() == "":
return "", "No content was generated. Check the logs for details."
return content, f"Successfully crawled {len(crawler.visited_urls)} pages."
except Exception as e:
logger.error(f"Error processing URL {url}: {str(e)}")
return "", f"Error: {str(e)}"
# Create Gradio interface
theme = gr.themes.Soft(primary_hue="blue", font="Open Sans")
with gr.Blocks(
theme=theme,
css="""
@import url('https://fonts.googleapis.com/css2?family=Open+Sans:wght@400;600&display=swap');
.gradio-container {
font-family: 'Open Sans', sans-serif !important;
}
.gr-button {
font-family: 'Open Sans', sans-serif !important;
font-weight: 600 !important;
}
.primary-btn {
background-color: #2436d4 !important;
color: white !important;
}
.primary-btn:hover {
background-color: #1c2aa8 !important;
}
[data-testid="textbox"] {
font-family: 'Open Sans', sans-serif !important;
}
.gr-padded {
font-family: 'Open Sans', sans-serif !important;
}
.gr-input {
font-family: 'Open Sans', sans-serif !important;
}
.gr-label {
font-family: 'Open Sans', sans-serif !important;
}
""",
) as iface:
gr.Markdown("# llms.txt Generator")
gr.Markdown("Generate an llms.txt file from a website following the specification.")
with gr.Row():
url_input = gr.Textbox(
label="Website URL",
placeholder="Enter the website URL (e.g., example.com)",
info="The URL will be automatically prefixed with https:// if not provided",
)
with gr.Row():
with gr.Column():
depth_input = gr.Slider(
minimum=1, maximum=5, value=3, step=1, label="Maximum Crawl Depth"
)
with gr.Column():
pages_input = gr.Slider(
minimum=10, maximum=100, value=50, step=10, label="Maximum Pages"
)
generate_btn = gr.Button("Generate llms.txt", variant="primary")
output = gr.Textbox(
label="Generated llms.txt Content",
lines=20,
show_copy_button=True,
container=True,
)
status = gr.Textbox(label="Status")
generate_btn.click(
fn=lambda url, depth, pages: asyncio.run(process_url(url, depth, pages)),
inputs=[url_input, depth_input, pages_input],
outputs=[output, status],
)
if __name__ == "__main__":
iface.launch()