vOCR / app.py
vineet124jig's picture
Update app.py
05ce83f verified
import gradio as gr
import requests
import json
import os
import time
from collections import defaultdict
BASE_URL = "https://api.jigsawstack.com/v1"
headers = {
"x-api-key": os.getenv("JIGSAWSTACK_API_KEY")
}
# Rate limiting configuration
request_times = defaultdict(list)
MAX_REQUESTS = 20 # Maximum requests per time window
TIME_WINDOW = 3600 # Time window in seconds (1 hour)
def get_real_ip(request: gr.Request):
"""Extract real IP address using x-forwarded-for header or fallback"""
if not request:
return "unknown"
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
ip = forwarded.split(",")[0].strip() # First IP in the list is the client's
else:
ip = request.client.host # fallback
return ip
def check_rate_limit(request: gr.Request):
"""Check if the current request exceeds rate limits"""
if not request:
return True, "Rate limit check failed - no request info"
ip = get_real_ip(request)
now = time.time()
# Clean up old timestamps outside the time window
request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW]
# Check if rate limit exceeded
if len(request_times[ip]) >= MAX_REQUESTS:
time_remaining = int(TIME_WINDOW - (now - request_times[ip][0]))
time_remaining_minutes = round(time_remaining / 60, 1)
time_window_minutes = round(TIME_WINDOW / 60, 1)
return False, f"Rate limit exceeded. You can make {MAX_REQUESTS} requests per {time_window_minutes} minutes. Try again in {time_remaining_minutes} minutes."
# Add current request timestamp
request_times[ip].append(now)
return True, ""
# ----------------- JigsawStack API Wrasppers ------------------
def vocr(source_type, image_url, file_store_key, prompt_str, page_range_str, request: gr.Request):
# Check rate limit first
rate_limit_ok, rate_limit_msg = check_rate_limit(request)
if not rate_limit_ok:
return (
rate_limit_msg, # status
None, # image
gr.update(visible=False), # context JSON
gr.update(visible=False), # tags
gr.update(visible=False), # has_text
gr.update(visible=False), # sections JSON
)
def error_response(message, img_src):
return (
message,
img_src,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False)
)
image_to_display = image_url if source_type == "URL" else None
try:
payload = {}
# Validate prompts - ensure a prompt is always provided.
if not prompt_str or not prompt_str.strip():
return error_response("Error: Prompt is required.", image_to_display)
prompts = [p.strip() for p in prompt_str.split(',') if p.strip()]
if not prompts:
return error_response("Error: Prompt cannot be empty or just commas.", image_to_display)
# The API can handle an array of prompts, which is more robust
# and avoids potential issues with the response format.
payload["prompt"] = prompts
# Validate page range
if page_range_str and page_range_str.strip():
try:
parts = [int(p.strip()) for p in page_range_str.split(',')]
if len(parts) != 2:
raise ValueError("Page range must be two numbers (e.g., 1,10).")
start_page, end_page = parts
if not (start_page > 0 and end_page > 0):
raise ValueError("Page numbers must be positive.")
if start_page > end_page:
raise ValueError("Start page cannot be greater than end page.")
if (end_page - start_page) >= 10:
raise ValueError("Page range cannot span more than 10 pages.")
payload["page_range"] = [start_page, end_page]
except (ValueError, TypeError) as e:
return error_response(f"Error: Invalid page range format - {e}", image_to_display)
if source_type == "URL":
if not image_url or not image_url.strip():
return error_response("Error: Image URL is required.", image_to_display)
payload["url"] = image_url.strip()
elif source_type == "File Store Key":
if not file_store_key or not file_store_key.strip():
return error_response("Error: File Store Key is required.", image_to_display)
payload["file_store_key"] = file_store_key.strip()
else:
return error_response("Error: Invalid image source selected.", image_to_display)
response = requests.post(f"{BASE_URL}/vocr", headers=headers, json=payload)
response.raise_for_status()
result = response.json()
if not result.get("success"):
return error_response(f"Error: vOCR failed - {result.get('message', 'Unknown error')}", image_to_display)
context = result.get("context", {})
tags = ", ".join(result.get("tags", []))
has_text = str(result.get("has_text", "N/A"))
sections = result.get("sections", [])
status = "✅ Successfully processed image with vOCR."
return (
status,
image_to_display,
gr.update(value=context, visible=True if context else False),
gr.update(value=tags, visible=True if tags else False),
gr.update(value=has_text, visible=True),
gr.update(value=sections, visible=True if sections else False)
)
except requests.exceptions.RequestException as e:
return error_response(f"Request failed: {str(e)}", image_to_display)
except Exception as e:
return error_response(f"An unexpected error occurred: {str(e)}", image_to_display)
# ----------------- Gradio UI ------------------
with gr.Blocks() as demo:
gr.Markdown("""
<div style='text-align: center; margin-bottom: 24px;'>
<h1 style='font-size:2.2em; margin-bottom: 0.2em;'>🧩 vOCR</h1>
<p style='font-size:1.2em; margin-top: 0;'>Extract text from images with advanced AI models.</p>
<p style='font-size:1em; margin-top: 0.5em;'>For more details and API usage, see the <a href='https://jigsawstack.com/docs/api-reference/ai/vocr' target='_blank'>documentation</a>.</p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("#### Image Source")
vocr_source_type = gr.Radio(
choices=["URL", "File Store Key"],
label="Choose Image Source",
value="URL"
)
vocr_image_url = gr.Textbox(
label="Image URL",
placeholder="https://media.snopes.com/2021/08/239918331_10228097135359041_3825446756894757753_n.jpg",
visible=True
)
vocr_file_key = gr.Textbox(
label="File Store Key",
placeholder="your-file-store-key",
visible=False
)
vocr_prompts = gr.Textbox(
label="Prompts (comma-separated)",
placeholder="total_price, tax, store_name",
info="Prompts to guide data extraction from the image."
)
vocr_page_range = gr.Textbox(
label="Page Range (Optional)",
placeholder="e.g., 1,10",
info="For multi-page docs. Max 10 pages."
)
vocr_btn = gr.Button("Analyze Image", variant="primary")
with gr.Column(scale=2):
gr.Markdown("#### Analysis Results")
vocr_status = gr.Textbox(label="Status", interactive=False)
vocr_image_display = gr.Image(label="Analyzed Image")
vocr_context = gr.JSON(label="Extracted Context", visible=False)
vocr_tags = gr.Textbox(label="Detected Tags", interactive=False, visible=False)
vocr_has_text = gr.Textbox(label="Text Detected?", interactive=False, visible=False)
vocr_sections = gr.JSON(label="Full OCR Sections", visible=False)
def update_vocr_source(source_type):
is_url = source_type == "URL"
return gr.update(visible=is_url), gr.update(visible=not is_url)
vocr_source_type.change(
update_vocr_source,
inputs=vocr_source_type,
outputs=[vocr_image_url, vocr_file_key]
)
vocr_btn.click(
vocr,
inputs=[vocr_source_type, vocr_image_url, vocr_file_key, vocr_prompts, vocr_page_range],
outputs=[vocr_status, vocr_image_display, vocr_context, vocr_tags, vocr_has_text, vocr_sections]
)
demo.launch()