Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
import json | |
import os | |
import time | |
from collections import defaultdict | |
BASE_URL = "https://api.jigsawstack.com/v1" | |
headers = { | |
"x-api-key": os.getenv("JIGSAWSTACK_API_KEY") | |
} | |
# Rate limiting configuration | |
request_times = defaultdict(list) | |
MAX_REQUESTS = 20 # Maximum requests per time window | |
TIME_WINDOW = 3600 # Time window in seconds (1 hour) | |
def get_real_ip(request: gr.Request): | |
"""Extract real IP address using x-forwarded-for header or fallback""" | |
if not request: | |
return "unknown" | |
forwarded = request.headers.get("x-forwarded-for") | |
if forwarded: | |
ip = forwarded.split(",")[0].strip() # First IP in the list is the client's | |
else: | |
ip = request.client.host # fallback | |
return ip | |
def check_rate_limit(request: gr.Request): | |
"""Check if the current request exceeds rate limits""" | |
if not request: | |
return True, "Rate limit check failed - no request info" | |
ip = get_real_ip(request) | |
now = time.time() | |
# Clean up old timestamps outside the time window | |
request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW] | |
# Check if rate limit exceeded | |
if len(request_times[ip]) >= MAX_REQUESTS: | |
time_remaining = int(TIME_WINDOW - (now - request_times[ip][0])) | |
time_remaining_minutes = round(time_remaining / 60, 1) | |
time_window_minutes = round(TIME_WINDOW / 60, 1) | |
return False, f"Rate limit exceeded. You can make {MAX_REQUESTS} requests per {time_window_minutes} minutes. Try again in {time_remaining_minutes} minutes." | |
# Add current request timestamp | |
request_times[ip].append(now) | |
return True, "" | |
# ----------------- JigsawStack API Wrasppers ------------------ | |
def vocr(source_type, image_url, file_store_key, prompt_str, page_range_str, request: gr.Request): | |
# Check rate limit first | |
rate_limit_ok, rate_limit_msg = check_rate_limit(request) | |
if not rate_limit_ok: | |
return ( | |
rate_limit_msg, # status | |
None, # image | |
gr.update(visible=False), # context JSON | |
gr.update(visible=False), # tags | |
gr.update(visible=False), # has_text | |
gr.update(visible=False), # sections JSON | |
) | |
def error_response(message, img_src): | |
return ( | |
message, | |
img_src, | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False) | |
) | |
image_to_display = image_url if source_type == "URL" else None | |
try: | |
payload = {} | |
# Validate prompts - ensure a prompt is always provided. | |
if not prompt_str or not prompt_str.strip(): | |
return error_response("Error: Prompt is required.", image_to_display) | |
prompts = [p.strip() for p in prompt_str.split(',') if p.strip()] | |
if not prompts: | |
return error_response("Error: Prompt cannot be empty or just commas.", image_to_display) | |
# The API can handle an array of prompts, which is more robust | |
# and avoids potential issues with the response format. | |
payload["prompt"] = prompts | |
# Validate page range | |
if page_range_str and page_range_str.strip(): | |
try: | |
parts = [int(p.strip()) for p in page_range_str.split(',')] | |
if len(parts) != 2: | |
raise ValueError("Page range must be two numbers (e.g., 1,10).") | |
start_page, end_page = parts | |
if not (start_page > 0 and end_page > 0): | |
raise ValueError("Page numbers must be positive.") | |
if start_page > end_page: | |
raise ValueError("Start page cannot be greater than end page.") | |
if (end_page - start_page) >= 10: | |
raise ValueError("Page range cannot span more than 10 pages.") | |
payload["page_range"] = [start_page, end_page] | |
except (ValueError, TypeError) as e: | |
return error_response(f"Error: Invalid page range format - {e}", image_to_display) | |
if source_type == "URL": | |
if not image_url or not image_url.strip(): | |
return error_response("Error: Image URL is required.", image_to_display) | |
payload["url"] = image_url.strip() | |
elif source_type == "File Store Key": | |
if not file_store_key or not file_store_key.strip(): | |
return error_response("Error: File Store Key is required.", image_to_display) | |
payload["file_store_key"] = file_store_key.strip() | |
else: | |
return error_response("Error: Invalid image source selected.", image_to_display) | |
response = requests.post(f"{BASE_URL}/vocr", headers=headers, json=payload) | |
response.raise_for_status() | |
result = response.json() | |
if not result.get("success"): | |
return error_response(f"Error: vOCR failed - {result.get('message', 'Unknown error')}", image_to_display) | |
context = result.get("context", {}) | |
tags = ", ".join(result.get("tags", [])) | |
has_text = str(result.get("has_text", "N/A")) | |
sections = result.get("sections", []) | |
status = "✅ Successfully processed image with vOCR." | |
return ( | |
status, | |
image_to_display, | |
gr.update(value=context, visible=True if context else False), | |
gr.update(value=tags, visible=True if tags else False), | |
gr.update(value=has_text, visible=True), | |
gr.update(value=sections, visible=True if sections else False) | |
) | |
except requests.exceptions.RequestException as e: | |
return error_response(f"Request failed: {str(e)}", image_to_display) | |
except Exception as e: | |
return error_response(f"An unexpected error occurred: {str(e)}", image_to_display) | |
# ----------------- Gradio UI ------------------ | |
with gr.Blocks() as demo: | |
gr.Markdown(""" | |
<div style='text-align: center; margin-bottom: 24px;'> | |
<h1 style='font-size:2.2em; margin-bottom: 0.2em;'>🧩 vOCR</h1> | |
<p style='font-size:1.2em; margin-top: 0;'>Extract text from images with advanced AI models.</p> | |
<p style='font-size:1em; margin-top: 0.5em;'>For more details and API usage, see the <a href='https://jigsawstack.com/docs/api-reference/ai/vocr' target='_blank'>documentation</a>.</p> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("#### Image Source") | |
vocr_source_type = gr.Radio( | |
choices=["URL", "File Store Key"], | |
label="Choose Image Source", | |
value="URL" | |
) | |
vocr_image_url = gr.Textbox( | |
label="Image URL", | |
placeholder="https://media.snopes.com/2021/08/239918331_10228097135359041_3825446756894757753_n.jpg", | |
visible=True | |
) | |
vocr_file_key = gr.Textbox( | |
label="File Store Key", | |
placeholder="your-file-store-key", | |
visible=False | |
) | |
vocr_prompts = gr.Textbox( | |
label="Prompts (comma-separated)", | |
placeholder="total_price, tax, store_name", | |
info="Prompts to guide data extraction from the image." | |
) | |
vocr_page_range = gr.Textbox( | |
label="Page Range (Optional)", | |
placeholder="e.g., 1,10", | |
info="For multi-page docs. Max 10 pages." | |
) | |
vocr_btn = gr.Button("Analyze Image", variant="primary") | |
with gr.Column(scale=2): | |
gr.Markdown("#### Analysis Results") | |
vocr_status = gr.Textbox(label="Status", interactive=False) | |
vocr_image_display = gr.Image(label="Analyzed Image") | |
vocr_context = gr.JSON(label="Extracted Context", visible=False) | |
vocr_tags = gr.Textbox(label="Detected Tags", interactive=False, visible=False) | |
vocr_has_text = gr.Textbox(label="Text Detected?", interactive=False, visible=False) | |
vocr_sections = gr.JSON(label="Full OCR Sections", visible=False) | |
def update_vocr_source(source_type): | |
is_url = source_type == "URL" | |
return gr.update(visible=is_url), gr.update(visible=not is_url) | |
vocr_source_type.change( | |
update_vocr_source, | |
inputs=vocr_source_type, | |
outputs=[vocr_image_url, vocr_file_key] | |
) | |
vocr_btn.click( | |
vocr, | |
inputs=[vocr_source_type, vocr_image_url, vocr_file_key, vocr_prompts, vocr_page_range], | |
outputs=[vocr_status, vocr_image_display, vocr_context, vocr_tags, vocr_has_text, vocr_sections] | |
) | |
demo.launch() |