|
import gradio as gr |
|
import requests |
|
import json |
|
import os |
|
import time |
|
from collections import defaultdict |
|
from PIL import Image |
|
import io |
|
import pandas as pd |
|
|
|
BASE_URL = "https://api.jigsawstack.com/v1" |
|
headers = { |
|
"x-api-key":os.getenv("JIGSAWSTACK_API_KEY") |
|
} |
|
|
|
|
|
|
|
request_times = defaultdict(list) |
|
MAX_REQUESTS = 20 |
|
TIME_WINDOW = 3600 |
|
|
|
def get_real_ip(request: gr.Request): |
|
"""Extract real IP address using x-forwarded-for header or fallback""" |
|
if not request: |
|
return "unknown" |
|
|
|
forwarded = request.headers.get("x-forwarded-for") |
|
if forwarded: |
|
ip = forwarded.split(",")[0].strip() |
|
else: |
|
ip = request.client.host |
|
return ip |
|
|
|
def check_rate_limit(request: gr.Request): |
|
"""Check if the current request exceeds rate limits""" |
|
if not request: |
|
return True, "Rate limit check failed - no request info" |
|
|
|
ip = get_real_ip(request) |
|
now = time.time() |
|
|
|
|
|
request_times[ip] = [t for t in request_times[ip] if now - t < TIME_WINDOW] |
|
|
|
|
|
|
|
if len(request_times[ip]) >= MAX_REQUESTS: |
|
time_remaining = int(TIME_WINDOW - (now - request_times[ip][0])) |
|
time_remaining_minutes = round(time_remaining / 60, 1) |
|
time_window_minutes = round(TIME_WINDOW / 60, 1) |
|
|
|
return False, f"Rate limit exceeded. You can make {MAX_REQUESTS} requests per {time_window_minutes} minutes. Try again in {time_remaining_minutes} minutes." |
|
|
|
|
|
request_times[ip].append(now) |
|
return True, "" |
|
|
|
def analyze_sentiment(text, request: gr.Request): |
|
rate_limit_ok, rate_limit_msg = check_rate_limit(request) |
|
if not rate_limit_ok: |
|
return f"β {rate_limit_msg}", None, None, None, None |
|
|
|
if not text or not text.strip(): |
|
return "Error: Text input is required.", None, None, None, None |
|
|
|
try: |
|
response = requests.post( |
|
f"{BASE_URL}/ai/sentiment", |
|
headers=headers, |
|
json={"text": text.strip()} |
|
) |
|
response.raise_for_status() |
|
result = response.json() |
|
|
|
if not result.get("success"): |
|
error_msg = f"Error: API call failed - {result.get('message', 'Unknown error')}" |
|
return error_msg, None, None, None, None |
|
|
|
sentiment_data = result.get("sentiment", {}) |
|
|
|
overall_emotion = sentiment_data.get("emotion", "N/A") |
|
overall_sentiment = sentiment_data.get("sentiment", "N/A") |
|
overall_score = sentiment_data.get("score", "N/A") |
|
|
|
sentences = sentiment_data.get("sentences", []) |
|
|
|
if sentences: |
|
sentence_df = pd.DataFrame(sentences) |
|
sentence_df = sentence_df[['text', 'emotion', 'sentiment', 'score']] |
|
sentence_df.rename(columns={'text': 'Sentence', 'emotion': 'Emotion', 'sentiment': 'Sentiment', 'score': 'Score'}, inplace=True) |
|
else: |
|
sentence_df = pd.DataFrame(columns=['Sentence', 'Emotion', 'Sentiment', 'Score']) |
|
|
|
status_message = "β
Sentiment analysis complete." |
|
|
|
return status_message, overall_emotion, overall_sentiment, str(overall_score), sentence_df |
|
|
|
except requests.exceptions.RequestException as e: |
|
return f"Request failed: {str(e)}", None, None, None, None |
|
except Exception as e: |
|
return f"An unexpected error occurred: {str(e)}", None, None, None, None |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown(""" |
|
<div style='text-align: center; margin-bottom: 24px;'> |
|
<h1 style='font-size:2.2em; margin-bottom: 0.2em;'>π§© Analyze Sentiment</h1> |
|
<p style='font-size:1.2em; margin-top: 0;'>Perform line-by-line sentiment analysis on any text with detailed emotion detection.</p> |
|
<p style='font-size:1em; margin-top: 0.5em;'>For more details and API usage, see the <a href='https://jigsawstack.com/docs/api-reference/ai/sentiment' target='_blank'>documentation</a>.</p> |
|
</div> |
|
""") |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("#### Input Text") |
|
sentiment_text = gr.Textbox( |
|
label="Text to Analyze", |
|
lines=8, |
|
placeholder="Enter the text you want to analyze here..." |
|
) |
|
sentiment_btn = gr.Button("Analyze Sentiment", variant="primary") |
|
|
|
with gr.Column(): |
|
gr.Markdown("#### Overall Analysis") |
|
sentiment_status = gr.Textbox(label="Status", interactive=False) |
|
sentiment_emotion = gr.Textbox(label="Overall Emotion", interactive=False) |
|
sentiment_sentiment = gr.Textbox(label="Overall Sentiment", interactive=False) |
|
sentiment_score = gr.Textbox(label="Overall Score", interactive=False) |
|
|
|
gr.Markdown("#### Sentence-Level Breakdown") |
|
sentiment_sentences_df = gr.DataFrame(label="Sentence Analysis") |
|
|
|
sentiment_btn.click( |
|
analyze_sentiment, |
|
inputs=[sentiment_text], |
|
outputs=[sentiment_status, sentiment_emotion, sentiment_sentiment, sentiment_score, sentiment_sentences_df] |
|
) |
|
demo.launch() |
|
|