ai-scrape / app.py
vineet124jig's picture
Update app.py
37f30f2 verified
import gradio as gr
import requests
import json
import os
import time
from collections import defaultdict
from PIL import Image
import io
BASE_URL = "https://api.jigsawstack.com/v1"
headers = {
"x-api-key": os.getenv("JIGSAWSTACK_API_KEY")
}
# Rate limiting configuration
request_times = defaultdict(list)
MAX_REQUESTS = 20 # Maximum requests per time window
TIME_WINDOW = 3600 # Time window in seconds (1 hour)
def get_real_ip(request: gr.Request):
"""Extract real IP address using x-forwarded-for header or fallback"""
if not request:
return "unknown"
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
ip = forwarded.split(",")[0].strip() # First IP in the list is the client's
else:
ip = request.client.host # fallback
return ip
def check_rate_limit(request: gr.Request):
"""Check if the current request exceeds rate limits"""
if not request:
return True, "Rate limit check failed - no request info"
ip = get_real_ip(request)
now = time.time()
# Clean up old timestamps outside the time window
request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW]
# Check if rate limit exceeded
if len(request_times[ip]) >= MAX_REQUESTS:
time_remaining = int(TIME_WINDOW - (now - request_times[ip][0]))
time_remaining_minutes = round(time_remaining / 60, 1)
time_window_minutes = round(TIME_WINDOW / 60, 1)
return False, f"Rate limit exceeded. You can make {MAX_REQUESTS} requests per {time_window_minutes} minutes. Try again in {time_remaining_minutes} minutes."
# Add current request timestamp
request_times[ip].append(now)
return True, ""
def enhanced_ai_scrape(input_method, url, html, prompts_str, selector, page_pos, request: gr.Request):
def error_response(message):
return (
message,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
)
# Check rate limit first
rate_limit_ok, rate_limit_msg = check_rate_limit(request)
if not rate_limit_ok:
return error_response(f"Rate limit exceeded: {rate_limit_msg}")
try:
# Validate element prompts
prompts = [p.strip() for p in prompts_str.split(",") if p.strip()]
if not prompts:
return error_response("Error: No element prompts provided.")
if len(prompts) > 5:
return error_response("Error: Maximum 5 element prompts allowed.")
payload = {
"element_prompts": prompts,
"root_element_selector": selector or "main",
"page_position": int(page_pos) if str(page_pos).strip().isdigit() else 1
}
# Add URL or HTML based on input method
if input_method == "URL":
if not url or not url.strip():
return error_response("Error: URL is required when using URL input method.")
payload["url"] = url.strip()
elif input_method == "HTML Content":
if not html or not html.strip():
return error_response("Error: HTML content is required when using HTML input method.")
payload["html"] = html.strip()
response = requests.post(f"{BASE_URL}/ai/scrape", headers=headers, json=payload)
response.raise_for_status()
result = response.json()
if not result.get("success"):
return error_response(f"Error: Scraping failed - {result.get('message', 'Unknown error')}")
# Extract all the data
context = result.get("context", {})
selectors = result.get("selectors", {})
data = result.get("data", [])
links = result.get("link", [])
current_page = result.get("page_position", 1)
total_pages = result.get("page_position_length", 1)
# Format pagination info
pagination_text = f"Page {current_page} of {total_pages}"
if total_pages > 1:
pagination_text += f" (Total pages available: {total_pages})"
status_text = f"✅ Successfully scraped {len(data)} data items"
if context:
status_text += f" with {len(context)} context elements"
return (
status_text,
gr.update(value=context, visible=True if context else False),
gr.update(value=selectors, visible=True if selectors else False),
gr.update(value=data, visible=True if data else False),
gr.update(value=links, visible=True if links else False),
gr.update(value=pagination_text, visible=True),
)
except requests.exceptions.RequestException as req_err:
return error_response(f"Request failed: {str(req_err)}")
except Exception as e:
return error_response(f"Unexpected error: {str(e)}")
def get_rate_limit_status(request: gr.Request):
"""Get current rate limit status for the user"""
if not request:
return {"error": "Unable to get request info"}
ip = get_real_ip(request)
now = time.time()
# Clean up old timestamps
request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW]
current_requests = len(request_times[ip])
time_window_minutes = round(TIME_WINDOW / 60, 1)
if current_requests >= MAX_REQUESTS:
time_remaining = int(TIME_WINDOW - (now - request_times[ip][0]))
time_remaining_minutes = round(time_remaining / 60, 1)
return {
"status": "Rate limited",
"current_requests": current_requests,
"max_requests": MAX_REQUESTS,
"time_window_minutes": time_window_minutes,
"time_remaining_minutes": time_remaining_minutes
}
else:
return {
"status": "Available",
"current_requests": current_requests,
"max_requests": MAX_REQUESTS,
"time_window_minutes": time_window_minutes,
"remaining_requests": MAX_REQUESTS - current_requests
}
# ----------------- Gradio UI ------------------
with gr.Blocks() as demo:
gr.Markdown("""
<div style='text-align: center; margin-bottom: 24px;'>
<h1 style='font-size:2.2em; margin-bottom: 0.2em;'>🧩 AI Scraper</h1>
<p style='font-size:1.2em; margin-top: 0;'>Extract structured data from web pages with advanced AI models.</p>
<p style='font-size:1em; margin-top: 0.5em;'>For more details and API usage, see the <a href='https://jigsawstack.com/docs/api-reference/ai/scrape' target='_blank'>documentation</a>.</p>
<p style='font-size:0.9em; margin-top: 0.5em; color: #666;'>Rate limit: 1 request per hour per IP address</p>
</div>
""")
with gr.Row():
with gr.Column():
gr.Markdown("#### Input Method")
input_method_scraper = gr.Radio(
choices=["URL", "HTML Content"],
label="Choose Input Method",
value="URL"
)
# Conditional inputs based on selection
url_scraper = gr.Textbox(
label="Page URL",
placeholder="https://example.com/pricing",
info="URL of the page to scrape"
)
html_content = gr.Textbox(
label="HTML Content",
lines=8,
placeholder="<html>...</html>",
visible=False,
info="Raw HTML content to scrape"
)
gr.Markdown("#### Scraping Configuration")
element_prompts = gr.Textbox(
label="Element Prompts (comma-separated)",
lines=3,
placeholder="Plan title, Plan price, Features, Button text",
info="Items to scrape (max 5). E.g., 'Plan price', 'Plan title'"
)
root_selector = gr.Textbox(
label="Root Element Selector",
value="main",
placeholder="main, .container, #content",
info="CSS selector to limit scraping scope (default: main)"
)
page_position = gr.Number(
label="Page Position",
value=1,
minimum=1,
info="For pagination, current page number (min: 1)"
)
with gr.Column():
gr.Markdown("#### Results")
scrape_status = gr.Textbox(
label="Status",
interactive=False,
placeholder="Ready to scrape..."
)
gr.Markdown("#### Extracted Data")
context_output = gr.JSON(
label="Context Data",
visible=False
)
selectors_output = gr.JSON(
label="CSS Selectors Used",
visible=False
)
detailed_data = gr.JSON(
label="Detailed Scrape Data",
visible=False
)
links_data = gr.JSON(
label="Detected Links",
visible=False
)
gr.Markdown("#### Pagination Info")
pagination_info = gr.Textbox(
label="Page Information",
interactive=False,
visible=False
)
scrape_btn = gr.Button("Scrape with AI", variant="primary")
# Function to show/hide input groups based on selection
def update_scraper_input_visibility(method):
if method == "URL":
return gr.Textbox(visible=True), gr.Textbox(visible=False)
elif method == "HTML Content":
return gr.Textbox(visible=False), gr.Textbox(visible=True)
else:
return gr.Textbox(visible=True), gr.Textbox(visible=False)
input_method_scraper.change(
update_scraper_input_visibility,
inputs=input_method_scraper,
outputs=[url_scraper, html_content]
)
scrape_btn.click(
enhanced_ai_scrape,
inputs=[input_method_scraper, url_scraper, html_content, element_prompts, root_selector, page_position],
outputs=[scrape_status, context_output, selectors_output, detailed_data, links_data, pagination_info],
)
demo.launch()