import os import re import io import gradio as gr from typing import Tuple, Optional, Dict, List from deep_translator import GoogleTranslator from groq import Groq import google.generativeai as genai from dotenv import load_dotenv import requests import pytesseract from PIL import Image # Load environment variables load_dotenv() class OCRProcessor: def __init__(self): # Fetch the API keys from environment variables (secrets) self.api_keys = [ os.getenv("ocr_space_api_key1"), # Get the first API key from environment variable os.getenv("ocr_space_api_key2") # Get the second API key from environment variable ] def ocr_from_image(self, image: Image.Image) -> str: """ Extract text from an uploaded image using OCR (Optical Character Recognition). Args: image (Image.Image): The PIL image object from which text will be extracted. Returns: str: The extracted text from the image. """ for api_key in self.api_keys: try: # Prepare the API endpoint and data url = "https://api.ocr.space/parse/image" payload = { 'apikey': api_key, 'language': 'eng', # You can set this to the desired language } # Convert the image to bytes (PIL image to byte array) img_byte_arr = io.BytesIO() image.save(img_byte_arr, format='PNG') img_byte_arr = img_byte_arr.getvalue() # Make the API request response = requests.post(url, data=payload, files={'file': img_byte_arr}) # Parse the JSON response result = response.json() # Check if the response is valid and contains parsed text if 'ParsedResults' in result: extracted_text = result['ParsedResults'][0]['ParsedText'] return extracted_text.strip() else: # If the OCR response is empty or contains no parsed text, handle that error_message = result.get('ErrorMessage', 'Unknown error') print(f"Error from OCR API: {error_message}") return f"Error in OCR extraction: {error_message}" except Exception as e: # If an error occurs (e.g., network issues), print the error and try the next API key print(f"Error using API key {api_key}: {str(e)}") # If both API keys fail, return a final error message return "Error in extracting text from image using both API keys." class TextCleaner: """Handles cleaning and structuring of input text.""" @staticmethod def clean_markdown(text: str) -> str: """Remove unnecessary markdown and formatting symbols.""" # Normalize header levels (e.g., ### to ##) text = re.sub(r'#{3,}', '##', text) # Normalize emphasis markers (e.g., {3,} to *) text = re.sub(r'\*{3,}', '*', text) # Fix incorrect regex for emphasis # Normalize underscores (e.g., {3,} to _) text = re.sub(r'_{3,}', '_', text) # Fix incorrect regex for underscores # Normalize strikethrough (e.g., ~{3,} to ~) text = re.sub(r'~{3,}', '~', text) # Remove excessive line breaks and spaces text = re.sub(r'\n{3,}', '\n\n', text) # Replace more than 2 line breaks with 2 text = re.sub(r' {2,}', ' ', text) # Replace multiple spaces with a single space # Clean up code blocks (e.g., ``` to `) text = re.sub(r'`{3,}', '`', text) # Normalize code block delimiters return text.strip() @staticmethod def structure_question(text: str, llm_client) -> str: """Use LLM to structure messy questions.""" system_prompt = """Please structure the following text into clear, well-formatted questions. If there are multiple questions, separate them clearly. Remove any irrelevant information and improve clarity while maintaining the original meaning.""" try: response = llm_client.chat.completions.create( model="gemini-1.5-flash-002", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": text} ], temperature=0.3, max_tokens=2048 ) # Clean the response content before returning it return TextCleaner.clean_markdown(response['choices'][0]['message']['content']) except Exception as e: print(f"Question structuring error: {str(e)}") return text class LanguageManager: """Manages supported languages and their configurations.""" SUPPORTED_LANGUAGES: Dict[str, str] = { # Major World Languages "english": "en", "spanish": "es", "french": "fr", "german": "de", "portuguese": "pt", "italian": "it", "russian": "ru", "arabic": "ar", "japanese": "ja", "korean": "ko", "chinese_simplified": "zh", "dutch": "nl", "polish": "pl", "turkish": "tr", "vietnamese": "vi", "thai": "th", "indonesian": "id", "malay": "ms", "filipino": "tl", "greek": "el", "hebrew": "he", "czech": "cs", "slovak": "sk", "swedish": "sv", "danish": "da", "finnish": "fi", "norwegian": "no", "romanian": "ro", "hungarian": "hu", "bulgarian": "bg", "croatian": "hr", "serbian": "sr", "ukrainian": "uk", "persian": "fa", "swahili": "sw", # Indian Languages "hindi": "hi", "bengali": "bn", "tamil": "ta", "telugu": "te", "marathi": "mr", "gujarati": "gu", "kannada": "kn", "malayalam": "ml", "punjabi": "pa", "odia": "or", "assamese": "as", "sanskrit": "sa", "maithili": "mai", # Added Maithili "kashmiri": "ks", # Added Kashmiri "dogri": "dgr", # Added Dogri "konkani": "kok", # Added Konkani "nepali": "ne", # Added Nepali "manipuri": "mni", # Added Manipuri "sindhi": "sd", # Added Sindhi "santhali": "sat", # Added Santhali "bodo": "bodo", # Added Bodo "mauritian": "mfe", # Added Mauritian Creole (influenced by Indian culture) "rajastani": "raj", # Added Rajasthani "sikkimese": "sik", # Added Sikkimese (Lepcha and Bhutia) # Additional World Languages "azerbaijani": "az", "kazakh": "kk", "mongolian": "mn", "nepali": "ne", "sinhala": "si", "urdu": "ur", "myanmar": "my", "khmer": "km", "lao": "lo" } @classmethod def get_language_code(cls, language_name: str) -> str: """Get language code from language name (case-insensitive).""" # Normalize the input to lower case to handle case-insensitivity language_name = language_name.strip().lower() return cls.SUPPORTED_LANGUAGES.get(language_name, "en") @classmethod def get_language_name(cls, language_code: str) -> str: """Get language name from language code.""" # Find language name corresponding to the given code language_name = next((name for name, code in cls.SUPPORTED_LANGUAGES.items() if code == language_code), "English") return language_name.replace('_', ' ').title() @classmethod def add_language(cls, language_name: str, language_code: str) -> None: """Add a new language to the supported languages.""" language_name = language_name.strip().lower() if language_name not in cls.SUPPORTED_LANGUAGES: cls.SUPPORTED_LANGUAGES[language_name] = language_code else: print(f"Language '{language_name}' already exists.") @classmethod def remove_language(cls, language_name: str) -> None: """Remove a language from the supported languages.""" language_name = language_name.strip().lower() if language_name in cls.SUPPORTED_LANGUAGES: del cls.SUPPORTED_LANGUAGES[language_name] else: print(f"Language '{language_name}' not found.") @classmethod def list_supported_languages(cls) -> Dict[str, str]: """Get all supported languages with their codes.""" return cls.SUPPORTED_LANGUAGES class APIKeyManager: """Manages API keys for different LLM services with automatic rotation support.""" def __init__(self): # Load Groq and Gemini keys securely from environment variables self.groq_keys: List[str] = [ os.getenv(f"GORQ_API_KEY_{i}") for i in range(1, 6) if os.getenv(f"GORQ_API_KEY_{i}") ] self.gemini_keys: List[str] = [ os.getenv(f"GEMINI_API_KEY_{i}") for i in range(1, 6) if os.getenv(f"GEMINI_API_KEY_{i}") ] # Ensure keys are available if not self.groq_keys: raise ValueError("No valid Groq API keys found in environment variables.") if not self.gemini_keys: raise ValueError("No valid Gemini API keys found in environment variables.") # Initialize key rotation indices self.current_groq_index: int = 0 self.current_gemini_index: int = 0 def _rotate_key(self, keys: List[str], index: int) -> int: """Rotates the key index, returns new index.""" return (index + 1) % len(keys) def get_groq_key(self) -> str: """Returns current Groq API key and rotates on limit.""" return self.groq_keys[self.current_groq_index] def get_gemini_key(self) -> str: """Returns current Gemini API key and rotates on limit.""" return self.gemini_keys[self.current_gemini_index] def request_with_groq(self, url: str, params: dict): """Makes a request using Groq API keys, rotating on rate limit errors.""" for _ in range(len(self.groq_keys)): key = self.get_groq_key() headers = {"Authorization": f"Bearer {key}"} try: response = requests.get(url, headers=headers, params=params) response.raise_for_status() # raises HTTPError for bad responses return response.json() # successful request, return data except requests.exceptions.HTTPError as e: if response.status_code == 429: # Assuming 429 as the rate limit error print(f"Rate limit hit for Groq key: {key}. Rotating key...") self.current_groq_index = self._rotate_key(self.groq_keys, self.current_groq_index) else: raise e # for non-rate-limit errors, re-raise the exception def request_with_gemini(self, url: str, params: dict): """Makes a request using Gemini API keys, rotating on rate limit errors.""" for _ in range(len(self.gemini_keys)): key = self.get_gemini_key() headers = {"Authorization": f"Bearer {key}"} try: response = requests.get(url, headers=headers, params=params) response.raise_for_status() # raises HTTPError for bad responses return response.json() # successful request, return data except requests.exceptions.HTTPError as e: if response.status_code == 429: # Assuming 429 as the rate limit error print(f"Rate limit hit for Gemini key: {key}. Rotating key...") self.current_gemini_index = self._rotate_key(self.gemini_keys, self.current_gemini_index) else: raise e # for non-rate-limit errors, re-raise the exception class TranslationManager: """Manages translation between different languages.""" def __init__(self): self._supported_languages = set(LanguageManager.SUPPORTED_LANGUAGES.values()) self._translation_cache: Dict[str, str] = {} def _get_cache_key(self, text: str, source_lang: str, target_lang: str) -> str: """Generate a cache key for translation.""" return f"{source_lang}:{target_lang}:{text}" def translate_text(self, text: str, source_lang: str, target_lang: str) -> str: """Translate text between languages with caching and error handling.""" if source_lang == target_lang: return text cache_key = self._get_cache_key(text, source_lang, target_lang) if cache_key in self._translation_cache: return self._translation_cache[cache_key] try: if source_lang not in self._supported_languages: print(f"Warning: Unsupported source language {source_lang}") source_lang = 'auto' # auto-detect if source language is unsupported if target_lang not in self._supported_languages: print(f"Warning: Unsupported target language {target_lang}") return text # return original text if target language is unsupported # Using GoogleTranslator from deep_translator translated_text = GoogleTranslator(source=source_lang, target=target_lang).translate(text) self._translation_cache[cache_key] = translated_text return translated_text except Exception as e: print(f"Translation error: {str(e)}") return text class ModelManager: """Manages different language models and their responses.""" def __init__(self, api_key_manager: APIKeyManager): self.api_key_manager = api_key_manager self.model_configs = { "math": "llama3-70b-8192", "job": "llama-3.2-90b-text-preview", "general": "gemini-1.5-flash-002" } def get_model_response(self, question: str, question_type: str, language: str) -> str: """Get response from appropriate model with preprocessing.""" model_name = self.determine_model(question, question_type) try: if "gemini" in model_name: response = self._get_gemini_response(question) else: response = self._get_groq_response(question, model_name) return TextCleaner.clean_markdown(response) except Exception as e: print(f"Error with primary model: {str(e)}") return self._get_fallback_response(question) def determine_model(self, question: str, question_type: str) -> str: """Determine which model to use based on question type.""" return self.model_configs.get(question_type, self.model_configs["general"]) def _get_groq_response(self, question: str, model_name: str) -> str: """Get response from Groq model.""" try: client = Groq(api_key=self.api_key_manager.get_groq_key()) system_prompt = """You are a helpful assistant. For calculations, show step-by-step solutions, explain each step clearly, and double-check calculations.""" response = client.chat.completions.create( model=model_name, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": question} ], temperature=0.7, max_tokens=8192 ) return response.choices[0].message.content except Exception as e: raise Exception(f"Groq API error: {str(e)}") def _get_gemini_response(self, question: str) -> str: """Get response from Gemini model.""" try: genai.configure(api_key=self.api_key_manager.get_gemini_key()) model = genai.GenerativeModel( model_name="gemini-1.5-flash-002", generation_config={ "temperature": 0.7, "max_output_tokens": 8192, } ) response = model.generate_content(question) return response.text except Exception as e: raise Exception(f"Gemini API error: {str(e)}") def _get_fallback_response(self, question: str) -> str: """Get fallback response if primary model fails.""" try: return self._get_gemini_response(question) except Exception: return "I apologize, but I'm unable to process your request at the moment. Please try again later." # OCR Integration with Question Processing class QuestifyAI: """Main application class that manages multilingual question answering.""" def __init__(self): """Initialize QuestifyAI with key management and model components.""" self.api_key_manager = APIKeyManager() self.translation_manager = TranslationManager() self.language_manager = LanguageManager() self.model_manager = ModelManager(self.api_key_manager) # Fetch OCR API keys from Hugging Face Space secrets self.ocr_api_keys = [ os.getenv("ocr_space_api_key1"), # Get the first OCR API key from environment variable os.getenv("ocr_space_api_key2") # Get the second OCR API key from environment variable ] self.api_key_index = 0 # Start with the first API key def structure_question(self, question: str) -> str: """Organize user input by removing unwanted symbols and ensuring clarity.""" question = re.sub(r"[#/*\\]", "", question) question = question.strip() return question def clean_response(self, response: str) -> str: """Remove any unwanted characters or formatting artifacts.""" response = response.replace("*", "").replace("•", "").replace("#", "").replace("`", "").strip() return response def process_question( self, question: str, input_language: str, question_type: str ) -> Tuple[str, str, str]: """Process a question through translation and model response pipeline.""" # Clean and structure the question question = self.structure_question(question) # Get the language code language_code = self.language_manager.get_language_code(input_language) # Translate to English if needed english_question = (self.translation_manager.translate_text(question, language_code, "en") if language_code != "en" else question) # Get model response and clean it english_answer = self.model_manager.get_model_response( english_question, question_type, language_code ) english_answer = self.clean_response(english_answer) # Translate answer back to user's language if needed translated_answer = ( self.translation_manager.translate_text(english_answer, "en", language_code) if language_code != "en" else english_answer ) return english_question, english_answer, translated_answer def ocr_from_image(self, image: Image.Image) -> str: """ Extract text from an uploaded image using OCR.space API. Args: image (Image.Image): The PIL image object from which text will be extracted. Returns: str: The extracted text from the image. """ try: api_url = "https://api.ocr.space/parse/image" current_api_key = self.ocr_api_keys[self.api_key_index] # Open the image and send it to the API img_byte_arr = io.BytesIO() image.save(img_byte_arr, format="PNG") img_byte_arr = img_byte_arr.getvalue() files = {'file': ('image.png', img_byte_arr, 'image/png')} data = {'apikey': current_api_key} # Send the request to OCR.space API response = requests.post(api_url, files=files, data=data) response.raise_for_status() # Check for errors in the request # Parse the JSON response result = response.json() # Check if the OCR was successful if result.get("OCRExitCode") == 1: # Extract text from the response extracted_text = result["ParsedResults"][0]["ParsedText"] return extracted_text.strip() else: # If OCR failed, try switching the API key self.switch_api_key() return "Error in extracting text from image." except requests.exceptions.RequestException as e: # Handle request errors print(f"Error in OCR request: {str(e)}") self.switch_api_key() return "Error in extracting text from image." except Exception as e: # Handle general errors print(f"Error in OCR: {str(e)}") self.switch_api_key() return "Error in extracting text from image." def switch_api_key(self): """Switch to the next OCR API key when the current one reaches its limit or fails.""" self.api_key_index = (self.api_key_index + 1) % len(self.ocr_api_keys) print(f"Switched to API Key {self.api_key_index + 1}") def launch_ui(self): """Launch Gradio web interface for QuestifyAI.""" with gr.Blocks(title="Questify AI") as app: gr.Markdown("# Questify AI - Multilingual Question Answering System") with gr.Row(): with gr.Column(): # Input textbox for typed questions question_input = gr.Textbox( label="Your Question", placeholder="Type your question here...", lines=5 ) # Image input for OCR image_input = gr.Image( label="Or Upload an Image for OCR", type="pil" # Expect a PIL image object ) with gr.Row(): language_input = gr.Dropdown( label="Select Language", choices=sorted([name.replace('_', ' ').title() for name in LanguageManager.SUPPORTED_LANGUAGES.keys()]), value="English" ) question_type = gr.Dropdown( label="Question Type", choices=["general", "math", "job"], value="general" ) with gr.Row(): submit_btn = gr.Button("Submit", variant="primary") clear_btn = gr.Button("Clear") with gr.Row(): with gr.Column(): english_question = gr.Textbox( label="Structured English Question", interactive=False, lines=5 ) english_answer = gr.Textbox( label="Answer in English", interactive=False, lines=5 ) translated_answer = gr.Textbox( label="Translated Answer", interactive=False, lines=5 ) def handle_submit(question, language, q_type, image): """Handle question submission and process.""" try: # If image is uploaded, use OCR to extract text if image is not None: question = self.ocr_from_image(image) return self.process_question(question, language, q_type) except Exception as e: error_msg = f"Error: {str(e)}" return error_msg, error_msg, error_msg def handle_clear(): """Clear all input and output fields.""" return "", "", "", "", "" submit_btn.click( handle_submit, inputs=[question_input, language_input, question_type, image_input], outputs=[english_question, english_answer, translated_answer] ) clear_btn.click( handle_clear, outputs=[english_question, english_answer, translated_answer] ) app.launch(debug=True) if __name__ == "__main__": questify = QuestifyAI() questify.launch_ui()