apexherbert200's picture
added url to pdf
a2c2207
# scrape.py
from fastapi import FastAPI, HTTPException, Request, Response
from pydantic import BaseModel
from typing import Optional
import base64
import json
import asyncio
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from fastapi.responses import FileResponse
import os
import uuid
app = FastAPI(title="Web Analyzer API")
class ScreenshotResponse(BaseModel):
screenshot: str
class MetadataResponse(BaseModel):
title: Optional[str]
description: Optional[str]
og: dict
twitter: dict
canonical: Optional[str]
# Optional timeout wrapper to enforce global timeout
async def timeout_wrapper(coro, timeout=20):
try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="Operation timed out")
# More robust get_page() with fallbacks, stealth, and logging
async def get_page(url):
print(f"[INFO] Visiting URL: {url}")
pw = await async_playwright().start()
browser = await pw.chromium.launch(headless=True)
context = await browser.new_context()
# Stealth mode: prevent simple headless detection
await context.add_init_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
page = await context.new_page()
page.set_default_timeout(20000) # 20s max for waits on elements
try:
try:
print("[INFO] Trying to load with 'domcontentloaded'")
await page.goto(url, wait_until="domcontentloaded", timeout=20000)
except PlaywrightTimeoutError:
print("[WARN] domcontentloaded failed, trying 'load'")
await page.goto(url, wait_until="load", timeout=20000)
try:
await page.wait_for_selector("body", timeout=5000)
except Exception:
print("[WARN] <body> not found quickly. May still continue.")
except Exception as e:
print(f"[ERROR] Page load failed for {url}: {e}")
await browser.close()
await pw.stop()
raise HTTPException(status_code=504, detail=f"Page load failed: {str(e)}")
print("[INFO] Page loaded successfully.")
return page, browser, pw
# async def get_page(url):
# pw = await async_playwright().start()
# browser = await pw.chromium.launch(headless=True)
# context = await browser.new_context()
# # Stealth: hide headless detection
# await context.add_init_script(
# "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
# )
# page = await context.new_page()
# page.set_default_timeout(90000) # Apply to all waits
# try:
# # Try networkidle first (wait for full load)
# await page.goto(url, timeout=90000, wait_until="networkidle")
# await page.wait_for_selector("body", timeout=10000) # Ensure DOM is visible
# except PlaywrightTimeoutError:
# try:
# # Fallback to lighter load event
# await page.goto(url, timeout=90000, wait_until="load")
# except Exception as e:
# await browser.close()
# await pw.stop()
# raise HTTPException(status_code=504, detail=f"Page load failed: {str(e)}")
# return page, browser, pw
@app.middleware("http")
async def remove_leaky_headers(request: Request, call_next):
response: Response = await call_next(request)
# Safe header removal
for header in [
"link",
"x-proxied-host",
"x-proxied-path",
"x-proxied-replica",
"server"
]:
try:
del response.headers[header]
except KeyError:
pass # Header not present
# Add your own branded header
response.headers["server"] = "Webrify-Secure-Gateway"
return response
@app.get("/metadata", response_model=MetadataResponse)
async def get_metadata(url: str):
page, browser, pw = await get_page(url)
try:
title = await page.title()
# Get description meta tag
try:
desc = await page.get_attribute("meta[name='description']", "content")
except Exception:
desc = None
# Extract Open Graph metadata
og = {}
for prop in ["title", "description", "image"]:
try:
selector = f"meta[property='og:{prop}']"
if await page.query_selector(selector):
og[f"og:{prop}"] = await page.get_attribute(selector, "content")
else:
og[f"og:{prop}"] = None
except Exception:
og[f"og:{prop}"] = None
# Extract Twitter metadata
twitter = {}
for prop in ["title", "description", "image"]:
try:
selector = f"meta[name='twitter:{prop}']"
if await page.query_selector(selector):
twitter[f"twitter:{prop}"] = await page.get_attribute(selector, "content")
else:
twitter[f"twitter:{prop}"] = None
except Exception:
twitter[f"twitter:{prop}"] = None
# Get canonical URL
try:
canonical = await page.get_attribute("link[rel='canonical']", "href")
except Exception:
canonical = None
return {
"title": title,
"description": desc,
"og": og,
"twitter": twitter,
"canonical": canonical
}
finally:
await browser.close()
await pw.stop()
# @app.get("/screenshot", response_model=ScreenshotResponse)
# async def get_screenshot(url: str):
# page, browser, pw = await get_page(url)
# try:
# image_bytes = await page.screenshot(full_page=True)
# image_base64 = base64.b64encode(image_bytes).decode()
# return {"screenshot": image_base64}
# finally:
# await browser.close()
# await pw.stop()
# @app.get("/screenshot", response_model=ScreenshotResponse)
# async def get_screenshot(url: str):
# page, browser, pw = await get_page(url)
# try:
# # Scroll to bottom to trigger lazy-loaded content
# await page.evaluate("""
# () => {
# return new Promise((resolve) => {
# let totalHeight = 0;
# const distance = 100;
# const timer = setInterval(() => {
# window.scrollBy(0, distance);
# totalHeight += distance;
# if (totalHeight >= document.body.scrollHeight) {
# clearInterval(timer);
# resolve();
# }
# }, 100);
# });
# }
# """)
# # Give time for images and content to load
# await page.wait_for_timeout(2000)
# image_bytes = await page.screenshot(full_page=True)
# image_base64 = base64.b64encode(image_bytes).decode()
# return {"screenshot": image_base64}
# finally:
# await browser.close()
# await pw.stop()
@app.get("/screenshot", response_model=ScreenshotResponse)
async def get_screenshot(url: str):
page, browser, pw = await get_page(url)
try:
# Go to the page and wait until the network is idle
await page.goto(url, wait_until="networkidle", timeout=90000)
# Wait for the header (or similar element) to load
try:
await page.wait_for_selector("header", timeout=10000)
except:
pass # Don't fail if the header doesn't exist
# Remove sticky or fixed header issues before full-page screenshot
await page.add_style_tag(content="""
* {
scroll-behavior: auto !important;
}
header, .sticky, .fixed, [style*="position:fixed"] {
position: static !important;
top: auto !important;
}
""")
# Scroll down to trigger lazy loading
await page.evaluate("""
() => {
return new Promise((resolve) => {
let totalHeight = 0;
const distance = 100;
const timer = setInterval(() => {
window.scrollBy(0, distance);
totalHeight += distance;
if (totalHeight >= document.body.scrollHeight) {
clearInterval(timer);
resolve();
}
}, 100);
});
}
""")
# Wait to ensure lazy content and animations complete
await page.wait_for_timeout(2000)
# Take full-page screenshot
image_bytes = await page.screenshot(full_page=True)
image_base64 = base64.b64encode(image_bytes).decode()
return {"screenshot": image_base64}
finally:
await browser.close()
await pw.stop()
@app.get("/seo")
async def seo_audit(url: str):
page, browser, pw = await get_page(url)
try:
h1_count = await page.locator("h1").count()
imgs = await page.query_selector_all("img")
missing_alts = [await img.get_attribute("src") for img in imgs if not await img.get_attribute("alt")]
anchors = await page.query_selector_all("a[href]")
internal, external = 0, 0
for a in anchors:
href = await a.get_attribute("href")
if href and href.startswith("http"):
if url in href:
internal += 1
else:
external += 1
try:
robots = await page.get_attribute("meta[name='robots']", "content")
except Exception:
robots = None
try:
canonical = await page.get_attribute("link[rel='canonical']", "href")
except Exception:
canonical = None
return {
"h1_count": h1_count,
"missing_image_alts": missing_alts,
"internal_links": internal,
"external_links": external,
"robots_meta": robots,
"has_canonical": bool(canonical)
}
finally:
await browser.close()
await pw.stop()
@app.get("/performance")
async def performance_metrics(url: str):
page, browser, pw = await get_page(url)
try:
# Get navigation timing
try:
nav_timing = await page.evaluate("JSON.stringify(performance.getEntriesByType('navigation'))")
timing = json.loads(nav_timing)[0] if nav_timing else {}
page_load_time = timing.get('duration', None)
except Exception:
page_load_time = None
# Get First Contentful Paint
try:
fcp = await page.evaluate("performance.getEntriesByName('first-contentful-paint')[0]?.startTime")
except Exception:
fcp = None
# Get Largest Contentful Paint
try:
lcp = await page.evaluate("performance.getEntriesByType('largest-contentful-paint')[0]?.renderTime")
except Exception:
lcp = None
# Get Cumulative Layout Shift
try:
cls_entries = await page.evaluate("JSON.stringify(performance.getEntriesByType('layout-shift'))")
cls = sum(e.get('value', 0) for e in json.loads(cls_entries) if isinstance(e, dict))
except Exception:
cls = None
return {
"page_load_time_ms": page_load_time,
"first_contentful_paint": fcp,
"largest_contentful_paint": lcp,
"cumulative_layout_shift": cls
}
finally:
await browser.close()
await pw.stop()
@app.get("/structured-data")
async def structured_data(url: str):
page, browser, pw = await get_page(url)
try:
scripts = await page.query_selector_all("script[type='application/ld+json']")
json_ld_list = []
for s in scripts:
text = await s.inner_text()
try:
data = json.loads(text)
json_ld_list.append(data)
except Exception:
continue
types = []
for obj in json_ld_list:
if isinstance(obj, dict) and "@type" in obj:
types.append(obj["@type"])
return {
"schema_found": bool(json_ld_list),
"types": types,
"schema": json_ld_list
}
finally:
await browser.close()
await pw.stop()
@app.get("/accessibility")
async def accessibility_check(url: str):
page, browser, pw = await get_page(url)
try:
imgs = await page.query_selector_all("img")
missing_alt = len([img for img in imgs if not await img.get_attribute("alt")])
buttons = await page.query_selector_all("button")
missing_labels = len([b for b in buttons if not await b.get_attribute("aria-label") and not await b.inner_text()])
landmarks = []
for tag in ["main", "nav", "footer", "header"]:
if await page.query_selector(tag):
landmarks.append(tag)
return {
"images_missing_alt": missing_alt,
"buttons_missing_label": missing_labels,
"landmarks": landmarks
}
finally:
await browser.close()
await pw.stop()
@app.get("/html-to-pdf")
async def convert_html_to_pdf(url: str):
from playwright.async_api import async_playwright
filename = f"{uuid.uuid4().hex}.pdf"
output_path = f"/tmp/{filename}" # Or use another temp dir
pw = await async_playwright().start()
browser = await pw.chromium.launch()
page = await browser.new_page()
try:
await page.goto(url, wait_until="networkidle")
await page.pdf(
path=output_path,
format="A4",
print_background=True,
margin={"top": "1cm", "bottom": "1cm", "left": "1cm", "right": "1cm"},
)
finally:
await browser.close()
await pw.stop()
# Serve the file and remove after response
return FileResponse(
path=output_path,
filename="webpage.pdf",
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=webpage.pdf"}
)