Spaces:
Sleeping
Sleeping
import os | |
import re | |
import io | |
import gradio as gr | |
from typing import Tuple, Optional, Dict, List | |
from deep_translator import GoogleTranslator | |
from groq import Groq | |
import google.generativeai as genai | |
from dotenv import load_dotenv | |
import requests | |
import pytesseract | |
from PIL import Image | |
# Load environment variables | |
load_dotenv() | |
class OCRProcessor: | |
def __init__(self): | |
# Fetch the API keys from environment variables (secrets) | |
self.api_keys = [ | |
os.getenv("ocr_space_api_key1"), # Get the first API key from environment variable | |
os.getenv("ocr_space_api_key2") # Get the second API key from environment variable | |
] | |
def ocr_from_image(self, image: Image.Image) -> str: | |
""" | |
Extract text from an uploaded image using OCR (Optical Character Recognition). | |
Args: | |
image (Image.Image): The PIL image object from which text will be extracted. | |
Returns: | |
str: The extracted text from the image. | |
""" | |
for api_key in self.api_keys: | |
try: | |
# Prepare the API endpoint and data | |
url = "https://api.ocr.space/parse/image" | |
payload = { | |
'apikey': api_key, | |
'language': 'eng', # You can set this to the desired language | |
} | |
# Convert the image to bytes (PIL image to byte array) | |
img_byte_arr = io.BytesIO() | |
image.save(img_byte_arr, format='PNG') | |
img_byte_arr = img_byte_arr.getvalue() | |
# Make the API request | |
response = requests.post(url, data=payload, files={'file': img_byte_arr}) | |
# Parse the JSON response | |
result = response.json() | |
# Check if the response is valid and contains parsed text | |
if 'ParsedResults' in result: | |
extracted_text = result['ParsedResults'][0]['ParsedText'] | |
return extracted_text.strip() | |
else: | |
# If the OCR response is empty or contains no parsed text, handle that | |
error_message = result.get('ErrorMessage', 'Unknown error') | |
print(f"Error from OCR API: {error_message}") | |
return f"Error in OCR extraction: {error_message}" | |
except Exception as e: | |
# If an error occurs (e.g., network issues), print the error and try the next API key | |
print(f"Error using API key {api_key}: {str(e)}") | |
# If both API keys fail, return a final error message | |
return "Error in extracting text from image using both API keys." | |
class TextCleaner: | |
"""Handles cleaning and structuring of input text.""" | |
def clean_markdown(text: str) -> str: | |
"""Remove unnecessary markdown and formatting symbols.""" | |
# Normalize header levels (e.g., ### to ##) | |
text = re.sub(r'#{3,}', '##', text) | |
# Normalize emphasis markers (e.g., {3,} to *) | |
text = re.sub(r'\*{3,}', '*', text) # Fix incorrect regex for emphasis | |
# Normalize underscores (e.g., {3,} to _) | |
text = re.sub(r'_{3,}', '_', text) # Fix incorrect regex for underscores | |
# Normalize strikethrough (e.g., ~{3,} to ~) | |
text = re.sub(r'~{3,}', '~', text) | |
# Remove excessive line breaks and spaces | |
text = re.sub(r'\n{3,}', '\n\n', text) # Replace more than 2 line breaks with 2 | |
text = re.sub(r' {2,}', ' ', text) # Replace multiple spaces with a single space | |
# Clean up code blocks (e.g., ``` to `) | |
text = re.sub(r'`{3,}', '`', text) # Normalize code block delimiters | |
return text.strip() | |
def structure_question(text: str, llm_client) -> str: | |
"""Use LLM to structure messy questions.""" | |
system_prompt = """Please structure the following text into clear, well-formatted questions. If there are multiple questions, separate them clearly. Remove any irrelevant information and improve clarity while maintaining the original meaning.""" | |
try: | |
response = llm_client.chat.completions.create( | |
model="gemini-1.5-flash-002", | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": text} | |
], | |
temperature=0.3, | |
max_tokens=2048 | |
) | |
# Clean the response content before returning it | |
return TextCleaner.clean_markdown(response['choices'][0]['message']['content']) | |
except Exception as e: | |
print(f"Question structuring error: {str(e)}") | |
return text | |
class LanguageManager: | |
"""Manages supported languages and their configurations.""" | |
SUPPORTED_LANGUAGES: Dict[str, str] = { | |
# Major World Languages | |
"english": "en", | |
"spanish": "es", | |
"french": "fr", | |
"german": "de", | |
"portuguese": "pt", | |
"italian": "it", | |
"russian": "ru", | |
"arabic": "ar", | |
"japanese": "ja", | |
"korean": "ko", | |
"chinese_simplified": "zh", | |
"dutch": "nl", | |
"polish": "pl", | |
"turkish": "tr", | |
"vietnamese": "vi", | |
"thai": "th", | |
"indonesian": "id", | |
"malay": "ms", | |
"filipino": "tl", | |
"greek": "el", | |
"hebrew": "he", | |
"czech": "cs", | |
"slovak": "sk", | |
"swedish": "sv", | |
"danish": "da", | |
"finnish": "fi", | |
"norwegian": "no", | |
"romanian": "ro", | |
"hungarian": "hu", | |
"bulgarian": "bg", | |
"croatian": "hr", | |
"serbian": "sr", | |
"ukrainian": "uk", | |
"persian": "fa", | |
"swahili": "sw", | |
# Indian Languages | |
"hindi": "hi", | |
"bengali": "bn", | |
"tamil": "ta", | |
"telugu": "te", | |
"marathi": "mr", | |
"gujarati": "gu", | |
"kannada": "kn", | |
"malayalam": "ml", | |
"punjabi": "pa", | |
"odia": "or", | |
"assamese": "as", | |
"sanskrit": "sa", | |
"maithili": "mai", # Added Maithili | |
"kashmiri": "ks", # Added Kashmiri | |
"dogri": "dgr", # Added Dogri | |
"konkani": "kok", # Added Konkani | |
"nepali": "ne", # Added Nepali | |
"manipuri": "mni", # Added Manipuri | |
"sindhi": "sd", # Added Sindhi | |
"santhali": "sat", # Added Santhali | |
"bodo": "bodo", # Added Bodo | |
"mauritian": "mfe", # Added Mauritian Creole (influenced by Indian culture) | |
"rajastani": "raj", # Added Rajasthani | |
"sikkimese": "sik", # Added Sikkimese (Lepcha and Bhutia) | |
# Additional World Languages | |
"azerbaijani": "az", | |
"kazakh": "kk", | |
"mongolian": "mn", | |
"nepali": "ne", | |
"sinhala": "si", | |
"urdu": "ur", | |
"myanmar": "my", | |
"khmer": "km", | |
"lao": "lo" | |
} | |
def get_language_code(cls, language_name: str) -> str: | |
"""Get language code from language name (case-insensitive).""" | |
# Normalize the input to lower case to handle case-insensitivity | |
language_name = language_name.strip().lower() | |
return cls.SUPPORTED_LANGUAGES.get(language_name, "en") | |
def get_language_name(cls, language_code: str) -> str: | |
"""Get language name from language code.""" | |
# Find language name corresponding to the given code | |
language_name = next((name for name, code in cls.SUPPORTED_LANGUAGES.items() if code == language_code), "English") | |
return language_name.replace('_', ' ').title() | |
def add_language(cls, language_name: str, language_code: str) -> None: | |
"""Add a new language to the supported languages.""" | |
language_name = language_name.strip().lower() | |
if language_name not in cls.SUPPORTED_LANGUAGES: | |
cls.SUPPORTED_LANGUAGES[language_name] = language_code | |
else: | |
print(f"Language '{language_name}' already exists.") | |
def remove_language(cls, language_name: str) -> None: | |
"""Remove a language from the supported languages.""" | |
language_name = language_name.strip().lower() | |
if language_name in cls.SUPPORTED_LANGUAGES: | |
del cls.SUPPORTED_LANGUAGES[language_name] | |
else: | |
print(f"Language '{language_name}' not found.") | |
def list_supported_languages(cls) -> Dict[str, str]: | |
"""Get all supported languages with their codes.""" | |
return cls.SUPPORTED_LANGUAGES | |
class APIKeyManager: | |
"""Manages API keys for different LLM services with automatic rotation support.""" | |
def __init__(self): | |
# Load Groq and Gemini keys securely from environment variables | |
self.groq_keys: List[str] = [ | |
os.getenv(f"GORQ_API_KEY_{i}") for i in range(1, 6) | |
if os.getenv(f"GORQ_API_KEY_{i}") | |
] | |
self.gemini_keys: List[str] = [ | |
os.getenv(f"GEMINI_API_KEY_{i}") for i in range(1, 6) | |
if os.getenv(f"GEMINI_API_KEY_{i}") | |
] | |
# Ensure keys are available | |
if not self.groq_keys: | |
raise ValueError("No valid Groq API keys found in environment variables.") | |
if not self.gemini_keys: | |
raise ValueError("No valid Gemini API keys found in environment variables.") | |
# Initialize key rotation indices | |
self.current_groq_index: int = 0 | |
self.current_gemini_index: int = 0 | |
def _rotate_key(self, keys: List[str], index: int) -> int: | |
"""Rotates the key index, returns new index.""" | |
return (index + 1) % len(keys) | |
def get_groq_key(self) -> str: | |
"""Returns current Groq API key and rotates on limit.""" | |
return self.groq_keys[self.current_groq_index] | |
def get_gemini_key(self) -> str: | |
"""Returns current Gemini API key and rotates on limit.""" | |
return self.gemini_keys[self.current_gemini_index] | |
def request_with_groq(self, url: str, params: dict): | |
"""Makes a request using Groq API keys, rotating on rate limit errors.""" | |
for _ in range(len(self.groq_keys)): | |
key = self.get_groq_key() | |
headers = {"Authorization": f"Bearer {key}"} | |
try: | |
response = requests.get(url, headers=headers, params=params) | |
response.raise_for_status() # raises HTTPError for bad responses | |
return response.json() # successful request, return data | |
except requests.exceptions.HTTPError as e: | |
if response.status_code == 429: # Assuming 429 as the rate limit error | |
print(f"Rate limit hit for Groq key: {key}. Rotating key...") | |
self.current_groq_index = self._rotate_key(self.groq_keys, self.current_groq_index) | |
else: | |
raise e # for non-rate-limit errors, re-raise the exception | |
def request_with_gemini(self, url: str, params: dict): | |
"""Makes a request using Gemini API keys, rotating on rate limit errors.""" | |
for _ in range(len(self.gemini_keys)): | |
key = self.get_gemini_key() | |
headers = {"Authorization": f"Bearer {key}"} | |
try: | |
response = requests.get(url, headers=headers, params=params) | |
response.raise_for_status() # raises HTTPError for bad responses | |
return response.json() # successful request, return data | |
except requests.exceptions.HTTPError as e: | |
if response.status_code == 429: # Assuming 429 as the rate limit error | |
print(f"Rate limit hit for Gemini key: {key}. Rotating key...") | |
self.current_gemini_index = self._rotate_key(self.gemini_keys, self.current_gemini_index) | |
else: | |
raise e # for non-rate-limit errors, re-raise the exception | |
class TranslationManager: | |
"""Manages translation between different languages.""" | |
def __init__(self): | |
self._supported_languages = set(LanguageManager.SUPPORTED_LANGUAGES.values()) | |
self._translation_cache: Dict[str, str] = {} | |
def _get_cache_key(self, text: str, source_lang: str, target_lang: str) -> str: | |
"""Generate a cache key for translation.""" | |
return f"{source_lang}:{target_lang}:{text}" | |
def translate_text(self, text: str, source_lang: str, target_lang: str) -> str: | |
"""Translate text between languages with caching and error handling.""" | |
if source_lang == target_lang: | |
return text | |
cache_key = self._get_cache_key(text, source_lang, target_lang) | |
if cache_key in self._translation_cache: | |
return self._translation_cache[cache_key] | |
try: | |
if source_lang not in self._supported_languages: | |
print(f"Warning: Unsupported source language {source_lang}") | |
source_lang = 'auto' # auto-detect if source language is unsupported | |
if target_lang not in self._supported_languages: | |
print(f"Warning: Unsupported target language {target_lang}") | |
return text # return original text if target language is unsupported | |
# Using GoogleTranslator from deep_translator | |
translated_text = GoogleTranslator(source=source_lang, target=target_lang).translate(text) | |
self._translation_cache[cache_key] = translated_text | |
return translated_text | |
except Exception as e: | |
print(f"Translation error: {str(e)}") | |
return text | |
class ModelManager: | |
"""Manages different language models and their responses.""" | |
def __init__(self, api_key_manager: APIKeyManager): | |
self.api_key_manager = api_key_manager | |
self.model_configs = { | |
"math": "llama3-70b-8192", | |
"job": "llama-3.2-90b-text-preview", | |
"general": "gemini-1.5-flash-002" | |
} | |
def get_model_response(self, question: str, question_type: str, language: str) -> str: | |
"""Get response from appropriate model with preprocessing.""" | |
model_name = self.determine_model(question, question_type) | |
try: | |
if "gemini" in model_name: | |
response = self._get_gemini_response(question) | |
else: | |
response = self._get_groq_response(question, model_name) | |
return TextCleaner.clean_markdown(response) | |
except Exception as e: | |
print(f"Error with primary model: {str(e)}") | |
return self._get_fallback_response(question) | |
def determine_model(self, question: str, question_type: str) -> str: | |
"""Determine which model to use based on question type.""" | |
return self.model_configs.get(question_type, self.model_configs["general"]) | |
def _get_groq_response(self, question: str, model_name: str) -> str: | |
"""Get response from Groq model.""" | |
try: | |
client = Groq(api_key=self.api_key_manager.get_groq_key()) | |
system_prompt = """You are a helpful assistant. | |
For calculations, show step-by-step solutions, | |
explain each step clearly, and double-check calculations.""" | |
response = client.chat.completions.create( | |
model=model_name, | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": question} | |
], | |
temperature=0.7, | |
max_tokens=8192 | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
raise Exception(f"Groq API error: {str(e)}") | |
def _get_gemini_response(self, question: str) -> str: | |
"""Get response from Gemini model.""" | |
try: | |
genai.configure(api_key=self.api_key_manager.get_gemini_key()) | |
model = genai.GenerativeModel( | |
model_name="gemini-1.5-flash-002", | |
generation_config={ | |
"temperature": 0.7, | |
"max_output_tokens": 8192, | |
} | |
) | |
response = model.generate_content(question) | |
return response.text | |
except Exception as e: | |
raise Exception(f"Gemini API error: {str(e)}") | |
def _get_fallback_response(self, question: str) -> str: | |
"""Get fallback response if primary model fails.""" | |
try: | |
return self._get_gemini_response(question) | |
except Exception: | |
return "I apologize, but I'm unable to process your request at the moment. Please try again later." | |
# OCR Integration with Question Processing | |
class QuestifyAI: | |
"""Main application class that manages multilingual question answering.""" | |
def __init__(self): | |
"""Initialize QuestifyAI with key management and model components.""" | |
self.api_key_manager = APIKeyManager() | |
self.translation_manager = TranslationManager() | |
self.language_manager = LanguageManager() | |
self.model_manager = ModelManager(self.api_key_manager) | |
# Fetch OCR API keys from Hugging Face Space secrets | |
self.ocr_api_keys = [ | |
os.getenv("ocr_space_api_key1"), # Get the first OCR API key from environment variable | |
os.getenv("ocr_space_api_key2") # Get the second OCR API key from environment variable | |
] | |
self.api_key_index = 0 # Start with the first API key | |
def structure_question(self, question: str) -> str: | |
"""Organize user input by removing unwanted symbols and ensuring clarity.""" | |
question = re.sub(r"[#/*\\]", "", question) | |
question = question.strip() | |
return question | |
def clean_response(self, response: str) -> str: | |
"""Remove any unwanted characters or formatting artifacts.""" | |
response = response.replace("*", "").replace("•", "").replace("#", "").replace("`", "").strip() | |
return response | |
def process_question( | |
self, | |
question: str, | |
input_language: str, | |
question_type: str | |
) -> Tuple[str, str, str]: | |
"""Process a question through translation and model response pipeline.""" | |
# Clean and structure the question | |
question = self.structure_question(question) | |
# Get the language code | |
language_code = self.language_manager.get_language_code(input_language) | |
# Translate to English if needed | |
english_question = (self.translation_manager.translate_text(question, language_code, "en") | |
if language_code != "en" else question) | |
# Get model response and clean it | |
english_answer = self.model_manager.get_model_response( | |
english_question, question_type, language_code | |
) | |
english_answer = self.clean_response(english_answer) | |
# Translate answer back to user's language if needed | |
translated_answer = ( | |
self.translation_manager.translate_text(english_answer, "en", language_code) | |
if language_code != "en" else english_answer | |
) | |
return english_question, english_answer, translated_answer | |
def ocr_from_image(self, image: Image.Image) -> str: | |
""" | |
Extract text from an uploaded image using OCR.space API. | |
Args: | |
image (Image.Image): The PIL image object from which text will be extracted. | |
Returns: | |
str: The extracted text from the image. | |
""" | |
try: | |
api_url = "https://api.ocr.space/parse/image" | |
current_api_key = self.ocr_api_keys[self.api_key_index] | |
# Open the image and send it to the API | |
img_byte_arr = io.BytesIO() | |
image.save(img_byte_arr, format="PNG") | |
img_byte_arr = img_byte_arr.getvalue() | |
files = {'file': ('image.png', img_byte_arr, 'image/png')} | |
data = {'apikey': current_api_key} | |
# Send the request to OCR.space API | |
response = requests.post(api_url, files=files, data=data) | |
response.raise_for_status() # Check for errors in the request | |
# Parse the JSON response | |
result = response.json() | |
# Check if the OCR was successful | |
if result.get("OCRExitCode") == 1: | |
# Extract text from the response | |
extracted_text = result["ParsedResults"][0]["ParsedText"] | |
return extracted_text.strip() | |
else: | |
# If OCR failed, try switching the API key | |
self.switch_api_key() | |
return "Error in extracting text from image." | |
except requests.exceptions.RequestException as e: | |
# Handle request errors | |
print(f"Error in OCR request: {str(e)}") | |
self.switch_api_key() | |
return "Error in extracting text from image." | |
except Exception as e: | |
# Handle general errors | |
print(f"Error in OCR: {str(e)}") | |
self.switch_api_key() | |
return "Error in extracting text from image." | |
def switch_api_key(self): | |
"""Switch to the next OCR API key when the current one reaches its limit or fails.""" | |
self.api_key_index = (self.api_key_index + 1) % len(self.ocr_api_keys) | |
print(f"Switched to API Key {self.api_key_index + 1}") | |
def launch_ui(self): | |
"""Launch Gradio web interface for QuestifyAI.""" | |
with gr.Blocks(title="Questify AI") as app: | |
gr.Markdown("# Questify AI - Multilingual Question Answering System") | |
with gr.Row(): | |
with gr.Column(): | |
# Input textbox for typed questions | |
question_input = gr.Textbox( | |
label="Your Question", | |
placeholder="Type your question here...", | |
lines=5 | |
) | |
# Image input for OCR | |
image_input = gr.Image( | |
label="Or Upload an Image for OCR", | |
type="pil" # Expect a PIL image object | |
) | |
with gr.Row(): | |
language_input = gr.Dropdown( | |
label="Select Language", | |
choices=sorted([name.replace('_', ' ').title() for name in LanguageManager.SUPPORTED_LANGUAGES.keys()]), | |
value="English" | |
) | |
question_type = gr.Dropdown( | |
label="Question Type", | |
choices=["general", "math", "job"], | |
value="general" | |
) | |
with gr.Row(): | |
submit_btn = gr.Button("Submit", variant="primary") | |
clear_btn = gr.Button("Clear") | |
with gr.Row(): | |
with gr.Column(): | |
english_question = gr.Textbox( | |
label="Structured English Question", | |
interactive=False, | |
lines=5 | |
) | |
english_answer = gr.Textbox( | |
label="Answer in English", | |
interactive=False, | |
lines=5 | |
) | |
translated_answer = gr.Textbox( | |
label="Translated Answer", | |
interactive=False, | |
lines=5 | |
) | |
def handle_submit(question, language, q_type, image): | |
"""Handle question submission and process.""" | |
try: | |
# If image is uploaded, use OCR to extract text | |
if image is not None: | |
question = self.ocr_from_image(image) | |
return self.process_question(question, language, q_type) | |
except Exception as e: | |
error_msg = f"Error: {str(e)}" | |
return error_msg, error_msg, error_msg | |
def handle_clear(): | |
"""Clear all input and output fields.""" | |
return "", "", "", "", "" | |
submit_btn.click( | |
handle_submit, | |
inputs=[question_input, language_input, question_type, image_input], | |
outputs=[english_question, english_answer, translated_answer] | |
) | |
clear_btn.click( | |
handle_clear, | |
outputs=[english_question, english_answer, translated_answer] | |
) | |
app.launch(debug=True) | |
if __name__ == "__main__": | |
questify = QuestifyAI() | |
questify.launch_ui() |