Spaces:
Running
Running
import gradio as gr | |
import requests | |
import json | |
import os | |
import time | |
from collections import defaultdict | |
from PIL import Image | |
import io | |
BASE_URL = "https://api.jigsawstack.com/v1" | |
headers = { | |
"x-api-key": os.getenv("JIGSAWSTACK_API_KEY") | |
} | |
# Rate limiting configuration | |
request_times = defaultdict(list) | |
MAX_REQUESTS = 20 # Maximum requests per time window | |
TIME_WINDOW = 3600 # Time window in seconds (1 hour) | |
def get_real_ip(request: gr.Request): | |
"""Extract real IP address using x-forwarded-for header or fallback""" | |
if not request: | |
return "unknown" | |
forwarded = request.headers.get("x-forwarded-for") | |
if forwarded: | |
ip = forwarded.split(",")[0].strip() # First IP in the list is the client's | |
else: | |
ip = request.client.host # fallback | |
return ip | |
def check_rate_limit(request: gr.Request): | |
"""Check if the current request exceeds rate limits""" | |
if not request: | |
return True, "Rate limit check failed - no request info" | |
ip = get_real_ip(request) | |
now = time.time() | |
# Clean up old timestamps outside the time window | |
request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW] | |
# Check if rate limit exceeded | |
if len(request_times[ip]) >= MAX_REQUESTS: | |
time_remaining = int(TIME_WINDOW - (now - request_times[ip][0])) | |
time_remaining_minutes = round(time_remaining / 60, 1) | |
time_window_minutes = round(TIME_WINDOW / 60, 1) | |
return False, f"Rate limit exceeded. You can make {MAX_REQUESTS} requests per {time_window_minutes} minutes. Try again in {time_remaining_minutes} minutes." | |
# Add current request timestamp | |
request_times[ip].append(now) | |
return True, "" | |
def enhanced_ai_scrape(input_method, url, html, prompts_str, selector, page_pos, request: gr.Request): | |
def error_response(message): | |
return ( | |
message, | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
) | |
# Check rate limit first | |
rate_limit_ok, rate_limit_msg = check_rate_limit(request) | |
if not rate_limit_ok: | |
return error_response(f"Rate limit exceeded: {rate_limit_msg}") | |
try: | |
# Validate element prompts | |
prompts = [p.strip() for p in prompts_str.split(",") if p.strip()] | |
if not prompts: | |
return error_response("Error: No element prompts provided.") | |
if len(prompts) > 5: | |
return error_response("Error: Maximum 5 element prompts allowed.") | |
payload = { | |
"element_prompts": prompts, | |
"root_element_selector": selector or "main", | |
"page_position": int(page_pos) if str(page_pos).strip().isdigit() else 1 | |
} | |
# Add URL or HTML based on input method | |
if input_method == "URL": | |
if not url or not url.strip(): | |
return error_response("Error: URL is required when using URL input method.") | |
payload["url"] = url.strip() | |
elif input_method == "HTML Content": | |
if not html or not html.strip(): | |
return error_response("Error: HTML content is required when using HTML input method.") | |
payload["html"] = html.strip() | |
response = requests.post(f"{BASE_URL}/ai/scrape", headers=headers, json=payload) | |
response.raise_for_status() | |
result = response.json() | |
if not result.get("success"): | |
return error_response(f"Error: Scraping failed - {result.get('message', 'Unknown error')}") | |
# Extract all the data | |
context = result.get("context", {}) | |
selectors = result.get("selectors", {}) | |
data = result.get("data", []) | |
links = result.get("link", []) | |
current_page = result.get("page_position", 1) | |
total_pages = result.get("page_position_length", 1) | |
# Format pagination info | |
pagination_text = f"Page {current_page} of {total_pages}" | |
if total_pages > 1: | |
pagination_text += f" (Total pages available: {total_pages})" | |
status_text = f"✅ Successfully scraped {len(data)} data items" | |
if context: | |
status_text += f" with {len(context)} context elements" | |
return ( | |
status_text, | |
gr.update(value=context, visible=True if context else False), | |
gr.update(value=selectors, visible=True if selectors else False), | |
gr.update(value=data, visible=True if data else False), | |
gr.update(value=links, visible=True if links else False), | |
gr.update(value=pagination_text, visible=True), | |
) | |
except requests.exceptions.RequestException as req_err: | |
return error_response(f"Request failed: {str(req_err)}") | |
except Exception as e: | |
return error_response(f"Unexpected error: {str(e)}") | |
def get_rate_limit_status(request: gr.Request): | |
"""Get current rate limit status for the user""" | |
if not request: | |
return {"error": "Unable to get request info"} | |
ip = get_real_ip(request) | |
now = time.time() | |
# Clean up old timestamps | |
request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW] | |
current_requests = len(request_times[ip]) | |
time_window_minutes = round(TIME_WINDOW / 60, 1) | |
if current_requests >= MAX_REQUESTS: | |
time_remaining = int(TIME_WINDOW - (now - request_times[ip][0])) | |
time_remaining_minutes = round(time_remaining / 60, 1) | |
return { | |
"status": "Rate limited", | |
"current_requests": current_requests, | |
"max_requests": MAX_REQUESTS, | |
"time_window_minutes": time_window_minutes, | |
"time_remaining_minutes": time_remaining_minutes | |
} | |
else: | |
return { | |
"status": "Available", | |
"current_requests": current_requests, | |
"max_requests": MAX_REQUESTS, | |
"time_window_minutes": time_window_minutes, | |
"remaining_requests": MAX_REQUESTS - current_requests | |
} | |
# ----------------- Gradio UI ------------------ | |
with gr.Blocks() as demo: | |
gr.Markdown(""" | |
<div style='text-align: center; margin-bottom: 24px;'> | |
<h1 style='font-size:2.2em; margin-bottom: 0.2em;'>🧩 AI Scraper</h1> | |
<p style='font-size:1.2em; margin-top: 0;'>Extract structured data from web pages with advanced AI models.</p> | |
<p style='font-size:1em; margin-top: 0.5em;'>For more details and API usage, see the <a href='https://jigsawstack.com/docs/api-reference/ai/scrape' target='_blank'>documentation</a>.</p> | |
<p style='font-size:0.9em; margin-top: 0.5em; color: #666;'>Rate limit: 1 request per hour per IP address</p> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("#### Input Method") | |
input_method_scraper = gr.Radio( | |
choices=["URL", "HTML Content"], | |
label="Choose Input Method", | |
value="URL" | |
) | |
# Conditional inputs based on selection | |
url_scraper = gr.Textbox( | |
label="Page URL", | |
placeholder="https://example.com/pricing", | |
info="URL of the page to scrape" | |
) | |
html_content = gr.Textbox( | |
label="HTML Content", | |
lines=8, | |
placeholder="<html>...</html>", | |
visible=False, | |
info="Raw HTML content to scrape" | |
) | |
gr.Markdown("#### Scraping Configuration") | |
element_prompts = gr.Textbox( | |
label="Element Prompts (comma-separated)", | |
lines=3, | |
placeholder="Plan title, Plan price, Features, Button text", | |
info="Items to scrape (max 5). E.g., 'Plan price', 'Plan title'" | |
) | |
root_selector = gr.Textbox( | |
label="Root Element Selector", | |
value="main", | |
placeholder="main, .container, #content", | |
info="CSS selector to limit scraping scope (default: main)" | |
) | |
page_position = gr.Number( | |
label="Page Position", | |
value=1, | |
minimum=1, | |
info="For pagination, current page number (min: 1)" | |
) | |
with gr.Column(): | |
gr.Markdown("#### Results") | |
scrape_status = gr.Textbox( | |
label="Status", | |
interactive=False, | |
placeholder="Ready to scrape..." | |
) | |
gr.Markdown("#### Extracted Data") | |
context_output = gr.JSON( | |
label="Context Data", | |
visible=False | |
) | |
selectors_output = gr.JSON( | |
label="CSS Selectors Used", | |
visible=False | |
) | |
detailed_data = gr.JSON( | |
label="Detailed Scrape Data", | |
visible=False | |
) | |
links_data = gr.JSON( | |
label="Detected Links", | |
visible=False | |
) | |
gr.Markdown("#### Pagination Info") | |
pagination_info = gr.Textbox( | |
label="Page Information", | |
interactive=False, | |
visible=False | |
) | |
scrape_btn = gr.Button("Scrape with AI", variant="primary") | |
# Function to show/hide input groups based on selection | |
def update_scraper_input_visibility(method): | |
if method == "URL": | |
return gr.Textbox(visible=True), gr.Textbox(visible=False) | |
elif method == "HTML Content": | |
return gr.Textbox(visible=False), gr.Textbox(visible=True) | |
else: | |
return gr.Textbox(visible=True), gr.Textbox(visible=False) | |
input_method_scraper.change( | |
update_scraper_input_visibility, | |
inputs=input_method_scraper, | |
outputs=[url_scraper, html_content] | |
) | |
scrape_btn.click( | |
enhanced_ai_scrape, | |
inputs=[input_method_scraper, url_scraper, html_content, element_prompts, root_selector, page_position], | |
outputs=[scrape_status, context_output, selectors_output, detailed_data, links_data, pagination_info], | |
) | |
demo.launch() | |