Spaces:
Sleeping
Sleeping
File size: 8,856 Bytes
ad6bc96 6151e93 ad6bc96 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
import gradio as gr
import requests
import json
import os
import time
from collections import defaultdict
BASE_URL = "https://api.jigsawstack.com/v1"
headers = {
"x-api-key": os.getenv("JIGSAWSTACK_API_KEY")
}
# Rate limiting configuration
request_times = defaultdict(list)
MAX_REQUESTS = 20 # Maximum requests per time window
TIME_WINDOW = 3600 # Time window in seconds (1 hour)
def get_real_ip(request: gr.Request):
"""Extract real IP address using x-forwarded-for header or fallback"""
if not request:
return "unknown"
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
ip = forwarded.split(",")[0].strip() # First IP in the list is the client's
else:
ip = request.client.host # fallback
return ip
def check_rate_limit(request: gr.Request):
"""Check if the current request exceeds rate limits"""
if not request:
return True, "Rate limit check failed - no request info"
ip = get_real_ip(request)
now = time.time()
# Clean up old timestamps outside the time window
request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW]
# Check if rate limit exceeded
if len(request_times[ip]) >= MAX_REQUESTS:
time_remaining = int(TIME_WINDOW - (now - request_times[ip][0]))
time_remaining_minutes = round(time_remaining / 60, 1)
time_window_minutes = round(TIME_WINDOW / 60, 1)
return False, f"Rate limit exceeded. You can make {MAX_REQUESTS} requests per {time_window_minutes} minutes. Try again in {time_remaining_minutes} minutes."
# Add current request timestamp
request_times[ip].append(now)
return True, ""
# ----------------- JigsawStack API Wrasppers ------------------
def vocr(source_type, image_url, file_store_key, prompt_str, page_range_str, request: gr.Request):
# Check rate limit first
rate_limit_ok, rate_limit_msg = check_rate_limit(request)
if not rate_limit_ok:
return (
rate_limit_msg, # status
None, # image
gr.update(visible=False), # context JSON
gr.update(visible=False), # tags
gr.update(visible=False), # has_text
gr.update(visible=False), # sections JSON
)
def error_response(message, img_src):
return (
message,
img_src,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False)
)
image_to_display = image_url if source_type == "URL" else None
try:
payload = {}
# Validate prompts - ensure a prompt is always provided.
if not prompt_str or not prompt_str.strip():
return error_response("Error: Prompt is required.", image_to_display)
prompts = [p.strip() for p in prompt_str.split(',') if p.strip()]
if not prompts:
return error_response("Error: Prompt cannot be empty or just commas.", image_to_display)
# The API can handle an array of prompts, which is more robust
# and avoids potential issues with the response format.
payload["prompt"] = prompts
# Validate page range
if page_range_str and page_range_str.strip():
try:
parts = [int(p.strip()) for p in page_range_str.split(',')]
if len(parts) != 2:
raise ValueError("Page range must be two numbers (e.g., 1,10).")
start_page, end_page = parts
if not (start_page > 0 and end_page > 0):
raise ValueError("Page numbers must be positive.")
if start_page > end_page:
raise ValueError("Start page cannot be greater than end page.")
if (end_page - start_page) >= 10:
raise ValueError("Page range cannot span more than 10 pages.")
payload["page_range"] = [start_page, end_page]
except (ValueError, TypeError) as e:
return error_response(f"Error: Invalid page range format - {e}", image_to_display)
if source_type == "URL":
if not image_url or not image_url.strip():
return error_response("Error: Image URL is required.", image_to_display)
payload["url"] = image_url.strip()
elif source_type == "File Store Key":
if not file_store_key or not file_store_key.strip():
return error_response("Error: File Store Key is required.", image_to_display)
payload["file_store_key"] = file_store_key.strip()
else:
return error_response("Error: Invalid image source selected.", image_to_display)
response = requests.post(f"{BASE_URL}/vocr", headers=headers, json=payload)
response.raise_for_status()
result = response.json()
if not result.get("success"):
return error_response(f"Error: vOCR failed - {result.get('message', 'Unknown error')}", image_to_display)
context = result.get("context", {})
tags = ", ".join(result.get("tags", []))
has_text = str(result.get("has_text", "N/A"))
sections = result.get("sections", [])
status = "✅ Successfully processed image with vOCR."
return (
status,
image_to_display,
gr.update(value=context, visible=True if context else False),
gr.update(value=tags, visible=True if tags else False),
gr.update(value=has_text, visible=True),
gr.update(value=sections, visible=True if sections else False)
)
except requests.exceptions.RequestException as e:
return error_response(f"Request failed: {str(e)}", image_to_display)
except Exception as e:
return error_response(f"An unexpected error occurred: {str(e)}", image_to_display)
# ----------------- Gradio UI ------------------
with gr.Blocks() as demo:
gr.Markdown("""
<div style='text-align: center; margin-bottom: 24px;'>
<h1 style='font-size:2.2em; margin-bottom: 0.2em;'>🧩 vOCR</h1>
<p style='font-size:1.2em; margin-top: 0;'>Extract text from images with advanced AI models.</p>
<p style='font-size:1em; margin-top: 0.5em;'>For more details and API usage, see the <a href='https://jigsawstack.com/docs/api-reference/ai/vocr' target='_blank'>documentation</a>.</p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("#### Image Source")
vocr_source_type = gr.Radio(
choices=["URL", "File Store Key"],
label="Choose Image Source",
value="URL"
)
vocr_image_url = gr.Textbox(
label="Image URL",
placeholder="https://media.snopes.com/2021/08/239918331_10228097135359041_3825446756894757753_n.jpg",
visible=True
)
vocr_file_key = gr.Textbox(
label="File Store Key",
placeholder="your-file-store-key",
visible=False
)
vocr_prompts = gr.Textbox(
label="Prompts (comma-separated)",
placeholder="total_price, tax, store_name",
info="Prompts to guide data extraction from the image."
)
vocr_page_range = gr.Textbox(
label="Page Range (Optional)",
placeholder="e.g., 1,10",
info="For multi-page docs. Max 10 pages."
)
vocr_btn = gr.Button("Analyze Image", variant="primary")
with gr.Column(scale=2):
gr.Markdown("#### Analysis Results")
vocr_status = gr.Textbox(label="Status", interactive=False)
vocr_image_display = gr.Image(label="Analyzed Image")
vocr_context = gr.JSON(label="Extracted Context", visible=False)
vocr_tags = gr.Textbox(label="Detected Tags", interactive=False, visible=False)
vocr_has_text = gr.Textbox(label="Text Detected?", interactive=False, visible=False)
vocr_sections = gr.JSON(label="Full OCR Sections", visible=False)
def update_vocr_source(source_type):
is_url = source_type == "URL"
return gr.update(visible=is_url), gr.update(visible=not is_url)
vocr_source_type.change(
update_vocr_source,
inputs=vocr_source_type,
outputs=[vocr_image_url, vocr_file_key]
)
vocr_btn.click(
vocr,
inputs=[vocr_source_type, vocr_image_url, vocr_file_key, vocr_prompts, vocr_page_range],
outputs=[vocr_status, vocr_image_display, vocr_context, vocr_tags, vocr_has_text, vocr_sections]
)
demo.launch() |