|
|
|
from fastapi import FastAPI, HTTPException, Request, Response |
|
from pydantic import BaseModel |
|
from typing import Optional |
|
import base64 |
|
import json |
|
import asyncio |
|
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError |
|
from fastapi.responses import FileResponse |
|
import os |
|
import uuid |
|
|
|
app = FastAPI(title="Web Analyzer API") |
|
|
|
|
|
class ScreenshotResponse(BaseModel): |
|
screenshot: str |
|
|
|
class MetadataResponse(BaseModel): |
|
title: Optional[str] |
|
description: Optional[str] |
|
og: dict |
|
twitter: dict |
|
canonical: Optional[str] |
|
|
|
|
|
async def timeout_wrapper(coro, timeout=20): |
|
try: |
|
return await asyncio.wait_for(coro, timeout) |
|
except asyncio.TimeoutError: |
|
raise HTTPException(status_code=504, detail="Operation timed out") |
|
|
|
|
|
async def get_page(url): |
|
print(f"[INFO] Visiting URL: {url}") |
|
|
|
pw = await async_playwright().start() |
|
browser = await pw.chromium.launch(headless=True) |
|
context = await browser.new_context() |
|
|
|
|
|
await context.add_init_script( |
|
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" |
|
) |
|
|
|
page = await context.new_page() |
|
page.set_default_timeout(20000) |
|
|
|
try: |
|
try: |
|
print("[INFO] Trying to load with 'domcontentloaded'") |
|
await page.goto(url, wait_until="domcontentloaded", timeout=20000) |
|
except PlaywrightTimeoutError: |
|
print("[WARN] domcontentloaded failed, trying 'load'") |
|
await page.goto(url, wait_until="load", timeout=20000) |
|
|
|
try: |
|
await page.wait_for_selector("body", timeout=5000) |
|
except Exception: |
|
print("[WARN] <body> not found quickly. May still continue.") |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Page load failed for {url}: {e}") |
|
await browser.close() |
|
await pw.stop() |
|
raise HTTPException(status_code=504, detail=f"Page load failed: {str(e)}") |
|
|
|
print("[INFO] Page loaded successfully.") |
|
return page, browser, pw |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.middleware("http") |
|
async def remove_leaky_headers(request: Request, call_next): |
|
response: Response = await call_next(request) |
|
|
|
|
|
for header in [ |
|
"link", |
|
"x-proxied-host", |
|
"x-proxied-path", |
|
"x-proxied-replica", |
|
"server" |
|
]: |
|
try: |
|
del response.headers[header] |
|
except KeyError: |
|
pass |
|
|
|
|
|
response.headers["server"] = "Webrify-Secure-Gateway" |
|
return response |
|
|
|
|
|
@app.get("/metadata", response_model=MetadataResponse) |
|
async def get_metadata(url: str): |
|
page, browser, pw = await get_page(url) |
|
try: |
|
title = await page.title() |
|
|
|
|
|
try: |
|
desc = await page.get_attribute("meta[name='description']", "content") |
|
except Exception: |
|
desc = None |
|
|
|
|
|
og = {} |
|
for prop in ["title", "description", "image"]: |
|
try: |
|
selector = f"meta[property='og:{prop}']" |
|
if await page.query_selector(selector): |
|
og[f"og:{prop}"] = await page.get_attribute(selector, "content") |
|
else: |
|
og[f"og:{prop}"] = None |
|
except Exception: |
|
og[f"og:{prop}"] = None |
|
|
|
|
|
twitter = {} |
|
for prop in ["title", "description", "image"]: |
|
try: |
|
selector = f"meta[name='twitter:{prop}']" |
|
if await page.query_selector(selector): |
|
twitter[f"twitter:{prop}"] = await page.get_attribute(selector, "content") |
|
else: |
|
twitter[f"twitter:{prop}"] = None |
|
except Exception: |
|
twitter[f"twitter:{prop}"] = None |
|
|
|
|
|
try: |
|
canonical = await page.get_attribute("link[rel='canonical']", "href") |
|
except Exception: |
|
canonical = None |
|
return { |
|
"title": title, |
|
"description": desc, |
|
"og": og, |
|
"twitter": twitter, |
|
"canonical": canonical |
|
} |
|
finally: |
|
await browser.close() |
|
await pw.stop() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/screenshot", response_model=ScreenshotResponse) |
|
async def get_screenshot(url: str): |
|
page, browser, pw = await get_page(url) |
|
try: |
|
|
|
await page.goto(url, wait_until="networkidle", timeout=90000) |
|
|
|
|
|
try: |
|
await page.wait_for_selector("header", timeout=10000) |
|
except: |
|
pass |
|
|
|
|
|
await page.add_style_tag(content=""" |
|
* { |
|
scroll-behavior: auto !important; |
|
} |
|
header, .sticky, .fixed, [style*="position:fixed"] { |
|
position: static !important; |
|
top: auto !important; |
|
} |
|
""") |
|
|
|
|
|
await page.evaluate(""" |
|
() => { |
|
return new Promise((resolve) => { |
|
let totalHeight = 0; |
|
const distance = 100; |
|
const timer = setInterval(() => { |
|
window.scrollBy(0, distance); |
|
totalHeight += distance; |
|
if (totalHeight >= document.body.scrollHeight) { |
|
clearInterval(timer); |
|
resolve(); |
|
} |
|
}, 100); |
|
}); |
|
} |
|
""") |
|
|
|
|
|
await page.wait_for_timeout(2000) |
|
|
|
|
|
image_bytes = await page.screenshot(full_page=True) |
|
image_base64 = base64.b64encode(image_bytes).decode() |
|
|
|
return {"screenshot": image_base64} |
|
finally: |
|
await browser.close() |
|
await pw.stop() |
|
|
|
|
|
@app.get("/seo") |
|
async def seo_audit(url: str): |
|
page, browser, pw = await get_page(url) |
|
try: |
|
h1_count = await page.locator("h1").count() |
|
imgs = await page.query_selector_all("img") |
|
missing_alts = [await img.get_attribute("src") for img in imgs if not await img.get_attribute("alt")] |
|
anchors = await page.query_selector_all("a[href]") |
|
internal, external = 0, 0 |
|
for a in anchors: |
|
href = await a.get_attribute("href") |
|
if href and href.startswith("http"): |
|
if url in href: |
|
internal += 1 |
|
else: |
|
external += 1 |
|
try: |
|
robots = await page.get_attribute("meta[name='robots']", "content") |
|
except Exception: |
|
robots = None |
|
|
|
try: |
|
canonical = await page.get_attribute("link[rel='canonical']", "href") |
|
except Exception: |
|
canonical = None |
|
return { |
|
"h1_count": h1_count, |
|
"missing_image_alts": missing_alts, |
|
"internal_links": internal, |
|
"external_links": external, |
|
"robots_meta": robots, |
|
"has_canonical": bool(canonical) |
|
} |
|
finally: |
|
await browser.close() |
|
await pw.stop() |
|
|
|
@app.get("/performance") |
|
async def performance_metrics(url: str): |
|
page, browser, pw = await get_page(url) |
|
try: |
|
|
|
try: |
|
nav_timing = await page.evaluate("JSON.stringify(performance.getEntriesByType('navigation'))") |
|
timing = json.loads(nav_timing)[0] if nav_timing else {} |
|
page_load_time = timing.get('duration', None) |
|
except Exception: |
|
page_load_time = None |
|
|
|
|
|
try: |
|
fcp = await page.evaluate("performance.getEntriesByName('first-contentful-paint')[0]?.startTime") |
|
except Exception: |
|
fcp = None |
|
|
|
|
|
try: |
|
lcp = await page.evaluate("performance.getEntriesByType('largest-contentful-paint')[0]?.renderTime") |
|
except Exception: |
|
lcp = None |
|
|
|
|
|
try: |
|
cls_entries = await page.evaluate("JSON.stringify(performance.getEntriesByType('layout-shift'))") |
|
cls = sum(e.get('value', 0) for e in json.loads(cls_entries) if isinstance(e, dict)) |
|
except Exception: |
|
cls = None |
|
|
|
return { |
|
"page_load_time_ms": page_load_time, |
|
"first_contentful_paint": fcp, |
|
"largest_contentful_paint": lcp, |
|
"cumulative_layout_shift": cls |
|
} |
|
finally: |
|
await browser.close() |
|
await pw.stop() |
|
|
|
|
|
@app.get("/structured-data") |
|
async def structured_data(url: str): |
|
page, browser, pw = await get_page(url) |
|
try: |
|
scripts = await page.query_selector_all("script[type='application/ld+json']") |
|
json_ld_list = [] |
|
for s in scripts: |
|
text = await s.inner_text() |
|
try: |
|
data = json.loads(text) |
|
json_ld_list.append(data) |
|
except Exception: |
|
continue |
|
types = [] |
|
for obj in json_ld_list: |
|
if isinstance(obj, dict) and "@type" in obj: |
|
types.append(obj["@type"]) |
|
return { |
|
"schema_found": bool(json_ld_list), |
|
"types": types, |
|
"schema": json_ld_list |
|
} |
|
finally: |
|
await browser.close() |
|
await pw.stop() |
|
|
|
|
|
@app.get("/accessibility") |
|
async def accessibility_check(url: str): |
|
page, browser, pw = await get_page(url) |
|
try: |
|
imgs = await page.query_selector_all("img") |
|
missing_alt = len([img for img in imgs if not await img.get_attribute("alt")]) |
|
buttons = await page.query_selector_all("button") |
|
missing_labels = len([b for b in buttons if not await b.get_attribute("aria-label") and not await b.inner_text()]) |
|
landmarks = [] |
|
for tag in ["main", "nav", "footer", "header"]: |
|
if await page.query_selector(tag): |
|
landmarks.append(tag) |
|
return { |
|
"images_missing_alt": missing_alt, |
|
"buttons_missing_label": missing_labels, |
|
"landmarks": landmarks |
|
} |
|
finally: |
|
await browser.close() |
|
await pw.stop() |
|
|
|
|
|
|
|
|
|
@app.get("/html-to-pdf") |
|
async def convert_html_to_pdf(url: str): |
|
from playwright.async_api import async_playwright |
|
|
|
filename = f"{uuid.uuid4().hex}.pdf" |
|
output_path = f"/tmp/{filename}" |
|
|
|
pw = await async_playwright().start() |
|
browser = await pw.chromium.launch() |
|
page = await browser.new_page() |
|
|
|
try: |
|
await page.goto(url, wait_until="networkidle") |
|
await page.pdf( |
|
path=output_path, |
|
format="A4", |
|
print_background=True, |
|
margin={"top": "1cm", "bottom": "1cm", "left": "1cm", "right": "1cm"}, |
|
) |
|
finally: |
|
await browser.close() |
|
await pw.stop() |
|
|
|
|
|
return FileResponse( |
|
path=output_path, |
|
filename="webpage.pdf", |
|
media_type="application/pdf", |
|
headers={"Content-Disposition": "attachment; filename=webpage.pdf"} |
|
) |