File size: 8,856 Bytes
ad6bc96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6151e93
ad6bc96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import gradio as gr
import requests
import json
import os
import time
from collections import defaultdict

BASE_URL = "https://api.jigsawstack.com/v1"
headers = {
    "x-api-key":  os.getenv("JIGSAWSTACK_API_KEY")
}


# Rate limiting configuration
request_times = defaultdict(list)
MAX_REQUESTS = 20  # Maximum requests per time window
TIME_WINDOW = 3600   # Time window in seconds (1 hour)

def get_real_ip(request: gr.Request):
    """Extract real IP address using x-forwarded-for header or fallback"""
    if not request:
        return "unknown"
    
    forwarded = request.headers.get("x-forwarded-for")
    if forwarded:
        ip = forwarded.split(",")[0].strip()  # First IP in the list is the client's
    else:
        ip = request.client.host  # fallback
    return ip

def check_rate_limit(request: gr.Request):
    """Check if the current request exceeds rate limits"""
    if not request:
        return True, "Rate limit check failed - no request info"
    
    ip = get_real_ip(request)
    now = time.time()

    # Clean up old timestamps outside the time window
    request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW]
    

    # Check if rate limit exceeded
    if len(request_times[ip]) >= MAX_REQUESTS:
        time_remaining = int(TIME_WINDOW - (now - request_times[ip][0]))
        time_remaining_minutes = round(time_remaining / 60, 1)      
        time_window_minutes = round(TIME_WINDOW / 60, 1)
        
        return False, f"Rate limit exceeded. You can make {MAX_REQUESTS} requests per {time_window_minutes} minutes. Try again in {time_remaining_minutes} minutes."
    
    # Add current request timestamp
    request_times[ip].append(now)
    return True, ""


# ----------------- JigsawStack API Wrasppers ------------------

def vocr(source_type, image_url, file_store_key, prompt_str, page_range_str, request: gr.Request):
    # Check rate limit first
    rate_limit_ok, rate_limit_msg = check_rate_limit(request)
    if not rate_limit_ok:
        return (
            rate_limit_msg,  # status
            None,            # image
            gr.update(visible=False),  # context JSON
            gr.update(visible=False),  # tags
            gr.update(visible=False),  # has_text
            gr.update(visible=False),  # sections JSON
        )

    def error_response(message, img_src):
        return (
            message,
            img_src,
            gr.update(visible=False),
            gr.update(visible=False),
            gr.update(visible=False),
            gr.update(visible=False)
        )

    image_to_display = image_url if source_type == "URL" else None
    
    try:
        payload = {}
        # Validate prompts - ensure a prompt is always provided.
        if not prompt_str or not prompt_str.strip():
            return error_response("Error: Prompt is required.", image_to_display)
        
        prompts = [p.strip() for p in prompt_str.split(',') if p.strip()]
        if not prompts:
             return error_response("Error: Prompt cannot be empty or just commas.", image_to_display)
        
        # The API can handle an array of prompts, which is more robust
        # and avoids potential issues with the response format.
        payload["prompt"] = prompts

        # Validate page range
        if page_range_str and page_range_str.strip():
            try:
                parts = [int(p.strip()) for p in page_range_str.split(',')]
                if len(parts) != 2:
                    raise ValueError("Page range must be two numbers (e.g., 1,10).")
                start_page, end_page = parts
                if not (start_page > 0 and end_page > 0):
                    raise ValueError("Page numbers must be positive.")
                if start_page > end_page:
                    raise ValueError("Start page cannot be greater than end page.")
                if (end_page - start_page) >= 10:
                    raise ValueError("Page range cannot span more than 10 pages.")
                payload["page_range"] = [start_page, end_page]
            except (ValueError, TypeError) as e:
                return error_response(f"Error: Invalid page range format - {e}", image_to_display)

        if source_type == "URL":
            if not image_url or not image_url.strip():
                return error_response("Error: Image URL is required.", image_to_display)
            payload["url"] = image_url.strip()
        
        elif source_type == "File Store Key":
            if not file_store_key or not file_store_key.strip():
                return error_response("Error: File Store Key is required.", image_to_display)
            payload["file_store_key"] = file_store_key.strip()
        else:
            return error_response("Error: Invalid image source selected.", image_to_display)

        response = requests.post(f"{BASE_URL}/vocr", headers=headers, json=payload)
        response.raise_for_status()
        result = response.json()

        if not result.get("success"):
            return error_response(f"Error: vOCR failed - {result.get('message', 'Unknown error')}", image_to_display)

        context = result.get("context", {})
        tags = ", ".join(result.get("tags", []))
        has_text = str(result.get("has_text", "N/A"))
        sections = result.get("sections", [])

        status = "✅ Successfully processed image with vOCR."
        
        return (
            status,
            image_to_display,
            gr.update(value=context, visible=True if context else False),
            gr.update(value=tags, visible=True if tags else False),
            gr.update(value=has_text, visible=True),
            gr.update(value=sections, visible=True if sections else False)
        )

    except requests.exceptions.RequestException as e:
        return error_response(f"Request failed: {str(e)}", image_to_display)
    except Exception as e:
        return error_response(f"An unexpected error occurred: {str(e)}", image_to_display)

# ----------------- Gradio UI ------------------

with gr.Blocks() as demo:
    gr.Markdown("""
    <div style='text-align: center; margin-bottom: 24px;'>
        <h1 style='font-size:2.2em; margin-bottom: 0.2em;'>🧩 vOCR</h1>
        <p style='font-size:1.2em; margin-top: 0;'>Extract text from images with advanced AI models.</p>
        <p style='font-size:1em; margin-top: 0.5em;'>For more details and API usage, see the <a href='https://jigsawstack.com/docs/api-reference/ai/vocr' target='_blank'>documentation</a>.</p>
    </div>
    """)

    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("#### Image Source")
            vocr_source_type = gr.Radio(
                choices=["URL", "File Store Key"],
                label="Choose Image Source",
                value="URL"
            )
            vocr_image_url = gr.Textbox(
                label="Image URL",
                placeholder="https://media.snopes.com/2021/08/239918331_10228097135359041_3825446756894757753_n.jpg",
                visible=True
            )
            vocr_file_key = gr.Textbox(
                label="File Store Key",
                placeholder="your-file-store-key",
                visible=False
            )
            vocr_prompts = gr.Textbox(
                label="Prompts (comma-separated)",
                placeholder="total_price, tax, store_name",
                info="Prompts to guide data extraction from the image."
            )
            vocr_page_range = gr.Textbox(
                label="Page Range (Optional)",
                placeholder="e.g., 1,10",
                info="For multi-page docs. Max 10 pages."
            )
            vocr_btn = gr.Button("Analyze Image", variant="primary")

        with gr.Column(scale=2):
            gr.Markdown("#### Analysis Results")
            vocr_status = gr.Textbox(label="Status", interactive=False)
            vocr_image_display = gr.Image(label="Analyzed Image")
            vocr_context = gr.JSON(label="Extracted Context", visible=False)
            vocr_tags = gr.Textbox(label="Detected Tags", interactive=False, visible=False)
            vocr_has_text = gr.Textbox(label="Text Detected?", interactive=False, visible=False)
            vocr_sections = gr.JSON(label="Full OCR Sections", visible=False)

    def update_vocr_source(source_type):
        is_url = source_type == "URL"
        return gr.update(visible=is_url), gr.update(visible=not is_url)

    vocr_source_type.change(
        update_vocr_source,
        inputs=vocr_source_type,
        outputs=[vocr_image_url, vocr_file_key]
    )

    vocr_btn.click(
        vocr,
        inputs=[vocr_source_type, vocr_image_url, vocr_file_key, vocr_prompts, vocr_page_range],
        outputs=[vocr_status, vocr_image_display, vocr_context, vocr_tags, vocr_has_text, vocr_sections]
    )


demo.launch()