Questify_AI / app.py
Sam
Update app.py
f7ec51d verified
import os
import re
import io
import gradio as gr
from typing import Tuple, Optional, Dict, List
from deep_translator import GoogleTranslator
from groq import Groq
import google.generativeai as genai
from dotenv import load_dotenv
import requests
import pytesseract
from PIL import Image
# Load environment variables
load_dotenv()
class OCRProcessor:
def __init__(self):
# Fetch the API keys from environment variables (secrets)
self.api_keys = [
os.getenv("ocr_space_api_key1"), # Get the first API key from environment variable
os.getenv("ocr_space_api_key2") # Get the second API key from environment variable
]
def ocr_from_image(self, image: Image.Image) -> str:
"""
Extract text from an uploaded image using OCR (Optical Character Recognition).
Args:
image (Image.Image): The PIL image object from which text will be extracted.
Returns:
str: The extracted text from the image.
"""
for api_key in self.api_keys:
try:
# Prepare the API endpoint and data
url = "https://api.ocr.space/parse/image"
payload = {
'apikey': api_key,
'language': 'eng', # You can set this to the desired language
}
# Convert the image to bytes (PIL image to byte array)
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
img_byte_arr = img_byte_arr.getvalue()
# Make the API request
response = requests.post(url, data=payload, files={'file': img_byte_arr})
# Parse the JSON response
result = response.json()
# Check if the response is valid and contains parsed text
if 'ParsedResults' in result:
extracted_text = result['ParsedResults'][0]['ParsedText']
return extracted_text.strip()
else:
# If the OCR response is empty or contains no parsed text, handle that
error_message = result.get('ErrorMessage', 'Unknown error')
print(f"Error from OCR API: {error_message}")
return f"Error in OCR extraction: {error_message}"
except Exception as e:
# If an error occurs (e.g., network issues), print the error and try the next API key
print(f"Error using API key {api_key}: {str(e)}")
# If both API keys fail, return a final error message
return "Error in extracting text from image using both API keys."
class TextCleaner:
"""Handles cleaning and structuring of input text."""
@staticmethod
def clean_markdown(text: str) -> str:
"""Remove unnecessary markdown and formatting symbols."""
# Normalize header levels (e.g., ### to ##)
text = re.sub(r'#{3,}', '##', text)
# Normalize emphasis markers (e.g., {3,} to *)
text = re.sub(r'\*{3,}', '*', text) # Fix incorrect regex for emphasis
# Normalize underscores (e.g., {3,} to _)
text = re.sub(r'_{3,}', '_', text) # Fix incorrect regex for underscores
# Normalize strikethrough (e.g., ~{3,} to ~)
text = re.sub(r'~{3,}', '~', text)
# Remove excessive line breaks and spaces
text = re.sub(r'\n{3,}', '\n\n', text) # Replace more than 2 line breaks with 2
text = re.sub(r' {2,}', ' ', text) # Replace multiple spaces with a single space
# Clean up code blocks (e.g., ``` to `)
text = re.sub(r'`{3,}', '`', text) # Normalize code block delimiters
return text.strip()
@staticmethod
def structure_question(text: str, llm_client) -> str:
"""Use LLM to structure messy questions."""
system_prompt = """Please structure the following text into clear, well-formatted questions. If there are multiple questions, separate them clearly. Remove any irrelevant information and improve clarity while maintaining the original meaning."""
try:
response = llm_client.chat.completions.create(
model="gemini-1.5-flash-002",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": text}
],
temperature=0.3,
max_tokens=2048
)
# Clean the response content before returning it
return TextCleaner.clean_markdown(response['choices'][0]['message']['content'])
except Exception as e:
print(f"Question structuring error: {str(e)}")
return text
class LanguageManager:
"""Manages supported languages and their configurations."""
SUPPORTED_LANGUAGES: Dict[str, str] = {
# Major World Languages
"english": "en",
"spanish": "es",
"french": "fr",
"german": "de",
"portuguese": "pt",
"italian": "it",
"russian": "ru",
"arabic": "ar",
"japanese": "ja",
"korean": "ko",
"chinese_simplified": "zh",
"dutch": "nl",
"polish": "pl",
"turkish": "tr",
"vietnamese": "vi",
"thai": "th",
"indonesian": "id",
"malay": "ms",
"filipino": "tl",
"greek": "el",
"hebrew": "he",
"czech": "cs",
"slovak": "sk",
"swedish": "sv",
"danish": "da",
"finnish": "fi",
"norwegian": "no",
"romanian": "ro",
"hungarian": "hu",
"bulgarian": "bg",
"croatian": "hr",
"serbian": "sr",
"ukrainian": "uk",
"persian": "fa",
"swahili": "sw",
# Indian Languages
"hindi": "hi",
"bengali": "bn",
"tamil": "ta",
"telugu": "te",
"marathi": "mr",
"gujarati": "gu",
"kannada": "kn",
"malayalam": "ml",
"punjabi": "pa",
"odia": "or",
"assamese": "as",
"sanskrit": "sa",
"maithili": "mai", # Added Maithili
"kashmiri": "ks", # Added Kashmiri
"dogri": "dgr", # Added Dogri
"konkani": "kok", # Added Konkani
"nepali": "ne", # Added Nepali
"manipuri": "mni", # Added Manipuri
"sindhi": "sd", # Added Sindhi
"santhali": "sat", # Added Santhali
"bodo": "bodo", # Added Bodo
"mauritian": "mfe", # Added Mauritian Creole (influenced by Indian culture)
"rajastani": "raj", # Added Rajasthani
"sikkimese": "sik", # Added Sikkimese (Lepcha and Bhutia)
# Additional World Languages
"azerbaijani": "az",
"kazakh": "kk",
"mongolian": "mn",
"nepali": "ne",
"sinhala": "si",
"urdu": "ur",
"myanmar": "my",
"khmer": "km",
"lao": "lo"
}
@classmethod
def get_language_code(cls, language_name: str) -> str:
"""Get language code from language name (case-insensitive)."""
# Normalize the input to lower case to handle case-insensitivity
language_name = language_name.strip().lower()
return cls.SUPPORTED_LANGUAGES.get(language_name, "en")
@classmethod
def get_language_name(cls, language_code: str) -> str:
"""Get language name from language code."""
# Find language name corresponding to the given code
language_name = next((name for name, code in cls.SUPPORTED_LANGUAGES.items() if code == language_code), "English")
return language_name.replace('_', ' ').title()
@classmethod
def add_language(cls, language_name: str, language_code: str) -> None:
"""Add a new language to the supported languages."""
language_name = language_name.strip().lower()
if language_name not in cls.SUPPORTED_LANGUAGES:
cls.SUPPORTED_LANGUAGES[language_name] = language_code
else:
print(f"Language '{language_name}' already exists.")
@classmethod
def remove_language(cls, language_name: str) -> None:
"""Remove a language from the supported languages."""
language_name = language_name.strip().lower()
if language_name in cls.SUPPORTED_LANGUAGES:
del cls.SUPPORTED_LANGUAGES[language_name]
else:
print(f"Language '{language_name}' not found.")
@classmethod
def list_supported_languages(cls) -> Dict[str, str]:
"""Get all supported languages with their codes."""
return cls.SUPPORTED_LANGUAGES
class APIKeyManager:
"""Manages API keys for different LLM services with automatic rotation support."""
def __init__(self):
# Load Groq and Gemini keys securely from environment variables
self.groq_keys: List[str] = [
os.getenv(f"GORQ_API_KEY_{i}") for i in range(1, 6)
if os.getenv(f"GORQ_API_KEY_{i}")
]
self.gemini_keys: List[str] = [
os.getenv(f"GEMINI_API_KEY_{i}") for i in range(1, 6)
if os.getenv(f"GEMINI_API_KEY_{i}")
]
# Ensure keys are available
if not self.groq_keys:
raise ValueError("No valid Groq API keys found in environment variables.")
if not self.gemini_keys:
raise ValueError("No valid Gemini API keys found in environment variables.")
# Initialize key rotation indices
self.current_groq_index: int = 0
self.current_gemini_index: int = 0
def _rotate_key(self, keys: List[str], index: int) -> int:
"""Rotates the key index, returns new index."""
return (index + 1) % len(keys)
def get_groq_key(self) -> str:
"""Returns current Groq API key and rotates on limit."""
return self.groq_keys[self.current_groq_index]
def get_gemini_key(self) -> str:
"""Returns current Gemini API key and rotates on limit."""
return self.gemini_keys[self.current_gemini_index]
def request_with_groq(self, url: str, params: dict):
"""Makes a request using Groq API keys, rotating on rate limit errors."""
for _ in range(len(self.groq_keys)):
key = self.get_groq_key()
headers = {"Authorization": f"Bearer {key}"}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status() # raises HTTPError for bad responses
return response.json() # successful request, return data
except requests.exceptions.HTTPError as e:
if response.status_code == 429: # Assuming 429 as the rate limit error
print(f"Rate limit hit for Groq key: {key}. Rotating key...")
self.current_groq_index = self._rotate_key(self.groq_keys, self.current_groq_index)
else:
raise e # for non-rate-limit errors, re-raise the exception
def request_with_gemini(self, url: str, params: dict):
"""Makes a request using Gemini API keys, rotating on rate limit errors."""
for _ in range(len(self.gemini_keys)):
key = self.get_gemini_key()
headers = {"Authorization": f"Bearer {key}"}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status() # raises HTTPError for bad responses
return response.json() # successful request, return data
except requests.exceptions.HTTPError as e:
if response.status_code == 429: # Assuming 429 as the rate limit error
print(f"Rate limit hit for Gemini key: {key}. Rotating key...")
self.current_gemini_index = self._rotate_key(self.gemini_keys, self.current_gemini_index)
else:
raise e # for non-rate-limit errors, re-raise the exception
class TranslationManager:
"""Manages translation between different languages."""
def __init__(self):
self._supported_languages = set(LanguageManager.SUPPORTED_LANGUAGES.values())
self._translation_cache: Dict[str, str] = {}
def _get_cache_key(self, text: str, source_lang: str, target_lang: str) -> str:
"""Generate a cache key for translation."""
return f"{source_lang}:{target_lang}:{text}"
def translate_text(self, text: str, source_lang: str, target_lang: str) -> str:
"""Translate text between languages with caching and error handling."""
if source_lang == target_lang:
return text
cache_key = self._get_cache_key(text, source_lang, target_lang)
if cache_key in self._translation_cache:
return self._translation_cache[cache_key]
try:
if source_lang not in self._supported_languages:
print(f"Warning: Unsupported source language {source_lang}")
source_lang = 'auto' # auto-detect if source language is unsupported
if target_lang not in self._supported_languages:
print(f"Warning: Unsupported target language {target_lang}")
return text # return original text if target language is unsupported
# Using GoogleTranslator from deep_translator
translated_text = GoogleTranslator(source=source_lang, target=target_lang).translate(text)
self._translation_cache[cache_key] = translated_text
return translated_text
except Exception as e:
print(f"Translation error: {str(e)}")
return text
class ModelManager:
"""Manages different language models and their responses."""
def __init__(self, api_key_manager: APIKeyManager):
self.api_key_manager = api_key_manager
self.model_configs = {
"math": "llama3-70b-8192",
"job": "llama-3.2-90b-text-preview",
"general": "gemini-1.5-flash-002"
}
def get_model_response(self, question: str, question_type: str, language: str) -> str:
"""Get response from appropriate model with preprocessing."""
model_name = self.determine_model(question, question_type)
try:
if "gemini" in model_name:
response = self._get_gemini_response(question)
else:
response = self._get_groq_response(question, model_name)
return TextCleaner.clean_markdown(response)
except Exception as e:
print(f"Error with primary model: {str(e)}")
return self._get_fallback_response(question)
def determine_model(self, question: str, question_type: str) -> str:
"""Determine which model to use based on question type."""
return self.model_configs.get(question_type, self.model_configs["general"])
def _get_groq_response(self, question: str, model_name: str) -> str:
"""Get response from Groq model."""
try:
client = Groq(api_key=self.api_key_manager.get_groq_key())
system_prompt = """You are a helpful assistant.
For calculations, show step-by-step solutions,
explain each step clearly, and double-check calculations."""
response = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": question}
],
temperature=0.7,
max_tokens=8192
)
return response.choices[0].message.content
except Exception as e:
raise Exception(f"Groq API error: {str(e)}")
def _get_gemini_response(self, question: str) -> str:
"""Get response from Gemini model."""
try:
genai.configure(api_key=self.api_key_manager.get_gemini_key())
model = genai.GenerativeModel(
model_name="gemini-1.5-flash-002",
generation_config={
"temperature": 0.7,
"max_output_tokens": 8192,
}
)
response = model.generate_content(question)
return response.text
except Exception as e:
raise Exception(f"Gemini API error: {str(e)}")
def _get_fallback_response(self, question: str) -> str:
"""Get fallback response if primary model fails."""
try:
return self._get_gemini_response(question)
except Exception:
return "I apologize, but I'm unable to process your request at the moment. Please try again later."
# OCR Integration with Question Processing
class QuestifyAI:
"""Main application class that manages multilingual question answering."""
def __init__(self):
"""Initialize QuestifyAI with key management and model components."""
self.api_key_manager = APIKeyManager()
self.translation_manager = TranslationManager()
self.language_manager = LanguageManager()
self.model_manager = ModelManager(self.api_key_manager)
# Fetch OCR API keys from Hugging Face Space secrets
self.ocr_api_keys = [
os.getenv("ocr_space_api_key1"), # Get the first OCR API key from environment variable
os.getenv("ocr_space_api_key2") # Get the second OCR API key from environment variable
]
self.api_key_index = 0 # Start with the first API key
def structure_question(self, question: str) -> str:
"""Organize user input by removing unwanted symbols and ensuring clarity."""
question = re.sub(r"[#/*\\]", "", question)
question = question.strip()
return question
def clean_response(self, response: str) -> str:
"""Remove any unwanted characters or formatting artifacts."""
response = response.replace("*", "").replace("•", "").replace("#", "").replace("`", "").strip()
return response
def process_question(
self,
question: str,
input_language: str,
question_type: str
) -> Tuple[str, str, str]:
"""Process a question through translation and model response pipeline."""
# Clean and structure the question
question = self.structure_question(question)
# Get the language code
language_code = self.language_manager.get_language_code(input_language)
# Translate to English if needed
english_question = (self.translation_manager.translate_text(question, language_code, "en")
if language_code != "en" else question)
# Get model response and clean it
english_answer = self.model_manager.get_model_response(
english_question, question_type, language_code
)
english_answer = self.clean_response(english_answer)
# Translate answer back to user's language if needed
translated_answer = (
self.translation_manager.translate_text(english_answer, "en", language_code)
if language_code != "en" else english_answer
)
return english_question, english_answer, translated_answer
def ocr_from_image(self, image: Image.Image) -> str:
"""
Extract text from an uploaded image using OCR.space API.
Args:
image (Image.Image): The PIL image object from which text will be extracted.
Returns:
str: The extracted text from the image.
"""
try:
api_url = "https://api.ocr.space/parse/image"
current_api_key = self.ocr_api_keys[self.api_key_index]
# Open the image and send it to the API
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format="PNG")
img_byte_arr = img_byte_arr.getvalue()
files = {'file': ('image.png', img_byte_arr, 'image/png')}
data = {'apikey': current_api_key}
# Send the request to OCR.space API
response = requests.post(api_url, files=files, data=data)
response.raise_for_status() # Check for errors in the request
# Parse the JSON response
result = response.json()
# Check if the OCR was successful
if result.get("OCRExitCode") == 1:
# Extract text from the response
extracted_text = result["ParsedResults"][0]["ParsedText"]
return extracted_text.strip()
else:
# If OCR failed, try switching the API key
self.switch_api_key()
return "Error in extracting text from image."
except requests.exceptions.RequestException as e:
# Handle request errors
print(f"Error in OCR request: {str(e)}")
self.switch_api_key()
return "Error in extracting text from image."
except Exception as e:
# Handle general errors
print(f"Error in OCR: {str(e)}")
self.switch_api_key()
return "Error in extracting text from image."
def switch_api_key(self):
"""Switch to the next OCR API key when the current one reaches its limit or fails."""
self.api_key_index = (self.api_key_index + 1) % len(self.ocr_api_keys)
print(f"Switched to API Key {self.api_key_index + 1}")
def launch_ui(self):
"""Launch Gradio web interface for QuestifyAI."""
with gr.Blocks(title="Questify AI") as app:
gr.Markdown("# Questify AI - Multilingual Question Answering System")
with gr.Row():
with gr.Column():
# Input textbox for typed questions
question_input = gr.Textbox(
label="Your Question",
placeholder="Type your question here...",
lines=5
)
# Image input for OCR
image_input = gr.Image(
label="Or Upload an Image for OCR",
type="pil" # Expect a PIL image object
)
with gr.Row():
language_input = gr.Dropdown(
label="Select Language",
choices=sorted([name.replace('_', ' ').title() for name in LanguageManager.SUPPORTED_LANGUAGES.keys()]),
value="English"
)
question_type = gr.Dropdown(
label="Question Type",
choices=["general", "math", "job"],
value="general"
)
with gr.Row():
submit_btn = gr.Button("Submit", variant="primary")
clear_btn = gr.Button("Clear")
with gr.Row():
with gr.Column():
english_question = gr.Textbox(
label="Structured English Question",
interactive=False,
lines=5
)
english_answer = gr.Textbox(
label="Answer in English",
interactive=False,
lines=5
)
translated_answer = gr.Textbox(
label="Translated Answer",
interactive=False,
lines=5
)
def handle_submit(question, language, q_type, image):
"""Handle question submission and process."""
try:
# If image is uploaded, use OCR to extract text
if image is not None:
question = self.ocr_from_image(image)
return self.process_question(question, language, q_type)
except Exception as e:
error_msg = f"Error: {str(e)}"
return error_msg, error_msg, error_msg
def handle_clear():
"""Clear all input and output fields."""
return "", "", "", "", ""
submit_btn.click(
handle_submit,
inputs=[question_input, language_input, question_type, image_input],
outputs=[english_question, english_answer, translated_answer]
)
clear_btn.click(
handle_clear,
outputs=[english_question, english_answer, translated_answer]
)
app.launch(debug=True)
if __name__ == "__main__":
questify = QuestifyAI()
questify.launch_ui()