Spaces:
Sleeping
Sleeping
| # reverted to code v29 | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| import PyPDF2 | |
| import docx | |
| from docx import Document | |
| import io | |
| import tempfile | |
| import os | |
| from typing import Optional, Tuple | |
| import logging | |
| import spaces | |
| import time | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Import IndicProcessor | |
| from IndicTransToolkit.processor import IndicProcessor | |
| # Authentication credentials from environment variables | |
| VALID_USERNAME = os.getenv("USERNAME", "admin") | |
| VALID_PASSWORD = os.getenv("PASSWORD", "password123") | |
| # Session management | |
| authenticated_sessions = set() | |
| def authenticate(username: str, password: str) -> tuple: | |
| """Authenticate user credentials and return session info""" | |
| if username == VALID_USERNAME and password == VALID_PASSWORD: | |
| session_id = f"session_{int(time.time())}_{hash(username)}" | |
| authenticated_sessions.add(session_id) | |
| logger.info(f"Successful login for user: {username}") | |
| return True, session_id | |
| else: | |
| logger.warning(f"Failed login attempt for user: {username}") | |
| return False, None | |
| def is_authenticated(session_id: str) -> bool: | |
| """Check if session is authenticated""" | |
| return session_id in authenticated_sessions | |
| def logout_session(session_id: str): | |
| """Remove session from authenticated sessions""" | |
| if session_id in authenticated_sessions: | |
| authenticated_sessions.remove(session_id) | |
| logger.info(f"Session logged out: {session_id}") | |
| class IndicTrans2Translator: | |
| def __init__(self): | |
| self.en_indic_model = None | |
| self.en_indic_tokenizer = None | |
| self.indic_en_model = None | |
| self.indic_en_tokenizer = None | |
| self.ip = None | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.load_models() | |
| def load_models(self): | |
| """Load the IndicTrans2 models and tokenizers optimized for HuggingFace Spaces GPU""" | |
| try: | |
| logger.info("Loading IndicTrans2 models with HF Spaces GPU optimizations...") | |
| # Verify CUDA is available | |
| if torch.cuda.is_available(): | |
| logger.info(f"CUDA available: {torch.cuda.is_available()}") | |
| logger.info(f"CUDA device: {torch.cuda.get_device_name(torch.cuda.current_device())}") | |
| logger.info(f"CUDA device count: {torch.cuda.device_count()}") | |
| else: | |
| logger.warning("CUDA not available, using CPU") | |
| # Initialize IndicProcessor | |
| self.ip = IndicProcessor(inference=True) | |
| logger.info("IndicProcessor loaded successfully!") | |
| # Check if accelerate is available for device_map | |
| try: | |
| import accelerate | |
| use_device_map = True | |
| logger.info("Accelerate available, using device_map for optimal GPU utilization") | |
| except ImportError: | |
| use_device_map = False | |
| logger.info("Accelerate not available, using manual device placement") | |
| # Load English to Indic model with HF Spaces optimizations | |
| logger.info("Loading English to Indic model...") | |
| self.en_indic_tokenizer = AutoTokenizer.from_pretrained( | |
| "ai4bharat/indictrans2-en-indic-1B", | |
| trust_remote_code=True | |
| ) | |
| # Use bfloat16 for better performance on modern GPUs (A10G, A100, etc.) | |
| # Fall back to float16 if bfloat16 is not supported | |
| if torch.cuda.is_available(): | |
| try: | |
| # Check if GPU supports bfloat16 | |
| torch_dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16 | |
| logger.info(f"Using {torch_dtype} precision for optimal GPU performance") | |
| except: | |
| torch_dtype = torch.float16 | |
| logger.info("Using float16 precision") | |
| else: | |
| torch_dtype = torch.float32 | |
| logger.info("Using float32 precision for CPU") | |
| # Load model with or without device_map based on accelerate availability | |
| if use_device_map and torch.cuda.is_available(): | |
| self.en_indic_model = AutoModelForSeq2SeqLM.from_pretrained( | |
| "ai4bharat/indictrans2-en-indic-1B", | |
| trust_remote_code=True, | |
| torch_dtype=torch_dtype, | |
| low_cpu_mem_usage=True, | |
| device_map="auto" # Automatically distribute model across available GPUs | |
| ) | |
| else: | |
| self.en_indic_model = AutoModelForSeq2SeqLM.from_pretrained( | |
| "ai4bharat/indictrans2-en-indic-1B", | |
| trust_remote_code=True, | |
| torch_dtype=torch_dtype, | |
| low_cpu_mem_usage=True | |
| ) | |
| self.en_indic_model = self.en_indic_model.to(self.device) | |
| self.en_indic_model.eval() | |
| # Load Indic to English model | |
| logger.info("Loading Indic to English model...") | |
| self.indic_en_tokenizer = AutoTokenizer.from_pretrained( | |
| "ai4bharat/indictrans2-indic-en-1B", | |
| trust_remote_code=True | |
| ) | |
| if use_device_map and torch.cuda.is_available(): | |
| self.indic_en_model = AutoModelForSeq2SeqLM.from_pretrained( | |
| "ai4bharat/indictrans2-indic-en-1B", | |
| trust_remote_code=True, | |
| torch_dtype=torch_dtype, | |
| low_cpu_mem_usage=True, | |
| device_map="auto" | |
| ) | |
| else: | |
| self.indic_en_model = AutoModelForSeq2SeqLM.from_pretrained( | |
| "ai4bharat/indictrans2-indic-en-1B", | |
| trust_remote_code=True, | |
| torch_dtype=torch_dtype, | |
| low_cpu_mem_usage=True | |
| ) | |
| self.indic_en_model = self.indic_en_model.to(self.device) | |
| self.indic_en_model.eval() | |
| # Optimize models for inference | |
| if torch.cuda.is_available(): | |
| # Enable cuDNN benchmark for consistent input sizes | |
| torch.backends.cudnn.benchmark = True | |
| # Compile models for faster inference (PyTorch 2.0+) | |
| try: | |
| if not use_device_map: # Only compile if not using device_map (can conflict) | |
| self.en_indic_model = torch.compile(self.en_indic_model, mode="reduce-overhead") | |
| self.indic_en_model = torch.compile(self.indic_en_model, mode="reduce-overhead") | |
| logger.info("Models compiled with torch.compile for faster inference") | |
| else: | |
| logger.info("Skipping torch.compile (using device_map)") | |
| except Exception as e: | |
| logger.info(f"torch.compile not available or failed: {e}") | |
| logger.info("Models loaded successfully with HF Spaces optimizations!") | |
| # Log GPU memory usage | |
| if torch.cuda.is_available(): | |
| memory_allocated = torch.cuda.memory_allocated(0) / 1024**3 # GB | |
| memory_reserved = torch.cuda.memory_reserved(0) / 1024**3 # GB | |
| logger.info(f"GPU Memory - Allocated: {memory_allocated:.2f}GB, Reserved: {memory_reserved:.2f}GB") | |
| except Exception as e: | |
| logger.error(f"Error loading models: {str(e)}") | |
| raise e | |
| def split_into_sentences(self, text: str) -> list: | |
| """Split text into sentences while preserving paragraph structure""" | |
| import re | |
| # Split by paragraphs first (double newlines or more) | |
| paragraphs = re.split(r'\n\s*\n', text) | |
| sentence_list = [] | |
| paragraph_markers = [] | |
| for para_idx, paragraph in enumerate(paragraphs): | |
| if not paragraph.strip(): | |
| continue | |
| # Split paragraph into sentences using basic sentence endings | |
| sentences = re.split(r'(?<=[.!?])\s+', paragraph.strip()) | |
| for sent_idx, sentence in enumerate(sentences): | |
| if sentence.strip(): | |
| sentence_list.append(sentence.strip()) | |
| # Mark if this is the last sentence in a paragraph | |
| is_para_end = (sent_idx == len(sentences) - 1) | |
| is_last_para = (para_idx == len(paragraphs) - 1) | |
| paragraph_markers.append({ | |
| 'is_paragraph_end': is_para_end and not is_last_para, | |
| 'original_sentence': sentence.strip() | |
| }) | |
| return sentence_list, paragraph_markers | |
| def reconstruct_formatting(self, translated_sentences: list, paragraph_markers: list) -> str: | |
| """Reconstruct text with original paragraph formatting""" | |
| if len(translated_sentences) != len(paragraph_markers): | |
| # Fallback: join with single spaces if lengths don't match | |
| return ' '.join(translated_sentences) | |
| result = [] | |
| for i, (translation, marker) in enumerate(zip(translated_sentences, paragraph_markers)): | |
| result.append(translation) | |
| # Add paragraph break if this sentence ended a paragraph | |
| if marker['is_paragraph_end']: | |
| result.append('\n\n') | |
| # Add space between sentences within same paragraph | |
| elif i < len(translated_sentences) - 1: | |
| result.append(' ') | |
| return ''.join(result) | |
| def translate_text(self, text: str, source_lang: str, target_lang: str) -> str: | |
| """Translate text from source language to target language while preserving formatting""" | |
| try: | |
| # Get proper language-script codes | |
| source_lang_code = LANGUAGE_SCRIPT_MAPPING.get(source_lang) | |
| target_lang_code = LANGUAGE_SCRIPT_MAPPING.get(target_lang) | |
| if not source_lang_code or not target_lang_code: | |
| return f"Unsupported language: {source_lang} or {target_lang}" | |
| # Check if source and target are the same | |
| if source_lang == target_lang: | |
| return text # Return original text if same language | |
| # Debug logging | |
| logger.info(f"Translating from {source_lang} ({source_lang_code}) to {target_lang} ({target_lang_code})") | |
| # Check if input is single sentence or multiple paragraphs | |
| if '\n' not in text and len(text.split('.')) <= 2: | |
| # Simple single sentence - translate directly | |
| input_sentences = [text.strip()] | |
| paragraph_markers = None | |
| else: | |
| # Complex text - preserve formatting | |
| input_sentences, paragraph_markers = self.split_into_sentences(text) | |
| if not input_sentences: | |
| return "No valid text found to translate." | |
| # Determine which models to use based on source and target languages | |
| if source_lang == "en" and target_lang != "en": | |
| # English to Indic translation | |
| tokenizer = self.en_indic_tokenizer | |
| model = self.en_indic_model | |
| elif source_lang != "en" and target_lang == "en": | |
| # Indic to English translation | |
| tokenizer = self.indic_en_tokenizer | |
| model = self.indic_en_model | |
| elif source_lang != "en" and target_lang != "en": | |
| # Indic to Indic translation (via English as intermediate) | |
| logger.info(f"Performing Indic-to-Indic translation via English: {source_lang} -> English -> {target_lang}") | |
| # Step 1: Translate from source Indic language to English | |
| intermediate_text = self.translate_via_english(input_sentences, source_lang, "en", paragraph_markers) | |
| # Step 2: Translate from English to target Indic language | |
| if paragraph_markers: | |
| # Re-split the intermediate text to maintain structure | |
| intermediate_sentences, intermediate_markers = self.split_into_sentences(intermediate_text) | |
| final_text = self.translate_via_english(intermediate_sentences, "en", target_lang, intermediate_markers) | |
| else: | |
| final_text = self.translate_via_english([intermediate_text], "en", target_lang, None) | |
| return final_text | |
| else: | |
| # This shouldn't happen, but just in case | |
| return "Translation configuration error." | |
| # Direct translation (English <-> Indic) | |
| return self.perform_direct_translation(input_sentences, source_lang_code, target_lang_code, | |
| tokenizer, model, paragraph_markers) | |
| except Exception as e: | |
| logger.error(f"Translation error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"Error during translation: {str(e)}" | |
| def translate_via_english(self, input_sentences: list, source_lang: str, target_lang: str, paragraph_markers: list) -> str: | |
| """Helper method to translate via English intermediate step""" | |
| source_lang_code = LANGUAGE_SCRIPT_MAPPING.get(source_lang) | |
| target_lang_code = LANGUAGE_SCRIPT_MAPPING.get(target_lang) | |
| if source_lang == "en": | |
| # English to Indic | |
| tokenizer = self.en_indic_tokenizer | |
| model = self.en_indic_model | |
| else: | |
| # Indic to English | |
| tokenizer = self.indic_en_tokenizer | |
| model = self.indic_en_model | |
| return self.perform_direct_translation(input_sentences, source_lang_code, target_lang_code, | |
| tokenizer, model, paragraph_markers) | |
| def perform_direct_translation(self, input_sentences: list, source_lang_code: str, target_lang_code: str, | |
| tokenizer, model, paragraph_markers: list) -> str: | |
| """Perform the actual translation using the specified model optimized for HF Spaces GPU""" | |
| # Balanced batch size for optimal GPU utilization | |
| batch_size = 4 # Optimal for most HF Spaces GPU configurations | |
| # For very long sentences, reduce batch size | |
| avg_sentence_length = sum(len(s.split()) for s in input_sentences) / len(input_sentences) if input_sentences else 0 | |
| if avg_sentence_length > 100: | |
| batch_size = 2 | |
| elif avg_sentence_length > 200: | |
| batch_size = 1 | |
| logger.info(f"Using batch size {batch_size} for average sentence length {avg_sentence_length:.1f} words") | |
| all_translations = [] | |
| for i in range(0, len(input_sentences), batch_size): | |
| batch_sentences = input_sentences[i:i + batch_size] | |
| try: | |
| # Preprocess the batch using IndicProcessor | |
| batch = self.ip.preprocess_batch( | |
| batch_sentences, | |
| src_lang=source_lang_code, | |
| tgt_lang=target_lang_code | |
| ) | |
| # Tokenize with optimal settings for GPU | |
| inputs = tokenizer( | |
| batch, | |
| truncation=True, | |
| padding="longest", | |
| max_length=256, # Keep reasonable max length | |
| return_tensors="pt" | |
| ).to(self.device) | |
| # Generate translation with optimized parameters | |
| with torch.no_grad(): | |
| # Use torch.inference_mode() for better performance | |
| with torch.inference_mode(): | |
| outputs = model.generate( | |
| **inputs, | |
| do_sample=False, # Greedy decoding is faster | |
| max_length=256, | |
| num_beams=1, # Greedy search for speed | |
| use_cache=True, # Enable cache for better speed | |
| pad_token_id=tokenizer.pad_token_id, | |
| eos_token_id=tokenizer.eos_token_id | |
| ) | |
| # Decode the generated tokens | |
| generated_tokens = tokenizer.batch_decode( | |
| outputs, | |
| skip_special_tokens=True, | |
| clean_up_tokenization_spaces=True | |
| ) | |
| # Postprocess the translations using IndicProcessor | |
| batch_translations = self.ip.postprocess_batch(generated_tokens, lang=target_lang_code) | |
| all_translations.extend(batch_translations) | |
| # Progress logging for large documents | |
| if len(input_sentences) > 20: | |
| progress = min(100, int(((i + batch_size) / len(input_sentences)) * 100)) | |
| logger.info(f"Translation progress: {progress}% ({i + len(batch_sentences)}/{len(input_sentences)} sentences)") | |
| except Exception as e: | |
| logger.error(f"Translation error in batch {i//batch_size + 1}: {str(e)}") | |
| # Fallback: try single sentences with more conservative settings | |
| for single_sentence in batch_sentences: | |
| try: | |
| single_batch = self.ip.preprocess_batch( | |
| [single_sentence], | |
| src_lang=source_lang_code, | |
| tgt_lang=target_lang_code | |
| ) | |
| inputs = tokenizer( | |
| single_batch, | |
| truncation=True, | |
| padding=False, | |
| max_length=256, | |
| return_tensors="pt" | |
| ).to(self.device) | |
| with torch.no_grad(): | |
| with torch.inference_mode(): | |
| outputs = model.generate( | |
| **inputs, | |
| do_sample=False, | |
| max_length=256, | |
| num_beams=1, | |
| use_cache=True | |
| ) | |
| generated_tokens = tokenizer.batch_decode( | |
| outputs, | |
| skip_special_tokens=True, | |
| clean_up_tokenization_spaces=True | |
| ) | |
| single_translations = self.ip.postprocess_batch(generated_tokens, lang=target_lang_code) | |
| all_translations.extend(single_translations) | |
| except Exception as single_e: | |
| logger.error(f"Failed to translate sentence: {str(single_e)}") | |
| all_translations.append(f"[Translation failed: {single_sentence[:50]}...]") | |
| # Reconstruct formatting if we have paragraph structure | |
| if paragraph_markers and len(all_translations) == len(paragraph_markers): | |
| final_translation = self.reconstruct_formatting(all_translations, paragraph_markers) | |
| else: | |
| # Simple join if no paragraph structure or mismatch | |
| final_translation = ' '.join(all_translations) if all_translations else "Translation failed" | |
| return final_translation | |
| # Language mappings with proper IndicTrans2 language codes | |
| LANGUAGES = { | |
| "English": "en", | |
| "Assamese": "asm", | |
| "Bengali": "ben", | |
| "Bodo": "brx", | |
| "Dogri": "doi", | |
| "Gujarati": "guj", | |
| "Hindi": "hin", | |
| "Kannada": "kan", | |
| "Kashmiri": "kas", | |
| "Konkani": "gom", | |
| "Maithili": "mai", | |
| "Malayalam": "mal", | |
| "Manipuri": "mni", | |
| "Marathi": "mar", | |
| "Nepali": "nep", | |
| "Oriya": "ory", | |
| "Punjabi": "pan", | |
| "Sanskrit": "san", | |
| "Santali": "sat", | |
| "Sindhi": "snd", | |
| "Tamil": "tam", | |
| "Telugu": "tel", | |
| "Urdu": "urd" | |
| } | |
| # Language-script mapping with proper IndicTrans2 codes | |
| LANGUAGE_SCRIPT_MAPPING = { | |
| "en": "eng_Latn", | |
| "asm": "asm_Beng", | |
| "ben": "ben_Beng", | |
| "brx": "brx_Deva", | |
| "doi": "doi_Deva", | |
| "guj": "guj_Gujr", | |
| "hin": "hin_Deva", | |
| "kan": "kan_Knda", | |
| "kas": "kas_Arab", | |
| "gom": "gom_Deva", | |
| "mai": "mai_Deva", | |
| "mal": "mal_Mlym", | |
| "mni": "mni_Beng", | |
| "mar": "mar_Deva", | |
| "nep": "nep_Deva", | |
| "ory": "ory_Orya", | |
| "pan": "pan_Guru", | |
| "san": "san_Deva", | |
| "sat": "sat_Olck", | |
| "snd": "snd_Arab", | |
| "tam": "tam_Taml", | |
| "tel": "tel_Telu", | |
| "urd": "urd_Arab" | |
| } | |
| def extract_text_from_pdf(file_path: str) -> str: | |
| """Extract text from PDF file while preserving paragraph structure""" | |
| try: | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| paragraphs = [] | |
| for page in pdf_reader.pages: | |
| page_text = page.extract_text() | |
| if page_text.strip(): | |
| # Split by double newlines and clean up | |
| page_paragraphs = [p.strip() for p in page_text.split('\n\n') if p.strip()] | |
| paragraphs.extend(page_paragraphs) | |
| # Join paragraphs with double newlines to preserve structure | |
| return '\n\n'.join(paragraphs) | |
| except Exception as e: | |
| logger.error(f"Error extracting text from PDF: {str(e)}") | |
| return f"Error reading PDF: {str(e)}" | |
| def extract_text_from_docx(file_path: str) -> Tuple[str, list]: | |
| """Extract text from DOCX file while preserving paragraph structure and formatting info""" | |
| try: | |
| doc = Document(file_path) | |
| paragraphs = [] | |
| formatting_info = [] | |
| for para in doc.paragraphs: | |
| text = para.text.strip() | |
| if text: # Only add non-empty paragraphs | |
| paragraphs.append(text) | |
| # Store paragraph formatting information | |
| para_format = { | |
| 'alignment': para.alignment, | |
| 'left_indent': para.paragraph_format.left_indent, | |
| 'right_indent': para.paragraph_format.right_indent, | |
| 'first_line_indent': para.paragraph_format.first_line_indent, | |
| 'space_before': para.paragraph_format.space_before, | |
| 'space_after': para.paragraph_format.space_after, | |
| 'line_spacing': para.paragraph_format.line_spacing, | |
| 'runs': [] | |
| } | |
| # Store run-level formatting (font, size, bold, italic, etc.) | |
| for run in para.runs: | |
| if run.text.strip(): # Only store formatting for non-empty runs | |
| run_format = { | |
| 'text': run.text, | |
| 'bold': run.bold, | |
| 'italic': run.italic, | |
| 'underline': run.underline, | |
| 'font_name': run.font.name, | |
| 'font_size': run.font.size, | |
| 'font_color': None, | |
| 'highlight_color': None | |
| } | |
| # Try to get font color | |
| try: | |
| if run.font.color and run.font.color.rgb: | |
| run_format['font_color'] = run.font.color.rgb | |
| except: | |
| pass | |
| # Try to get highlight color | |
| try: | |
| if run.font.highlight_color: | |
| run_format['highlight_color'] = run.font.highlight_color | |
| except: | |
| pass | |
| para_format['runs'].append(run_format) | |
| formatting_info.append(para_format) | |
| # Join paragraphs with double newlines to preserve structure | |
| text = '\n\n'.join(paragraphs) | |
| return text, formatting_info | |
| except Exception as e: | |
| logger.error(f"Error extracting text from DOCX: {str(e)}") | |
| return f"Error reading DOCX: {str(e)}", [] | |
| def create_formatted_docx(translated_paragraphs: list, formatting_info: list, filename: str) -> str: | |
| """Create a DOCX file with translated text while preserving original formatting""" | |
| try: | |
| doc = Document() | |
| # Remove the default paragraph that gets created | |
| if doc.paragraphs: | |
| p = doc.paragraphs[0] | |
| p._element.getparent().remove(p._element) | |
| for i, (para_text, para_format) in enumerate(zip(translated_paragraphs, formatting_info)): | |
| if not para_text.strip(): | |
| continue | |
| # Create new paragraph | |
| paragraph = doc.add_paragraph() | |
| # Apply paragraph-level formatting | |
| try: | |
| if para_format.get('alignment') is not None: | |
| paragraph.alignment = para_format['alignment'] | |
| if para_format.get('left_indent') is not None: | |
| paragraph.paragraph_format.left_indent = para_format['left_indent'] | |
| if para_format.get('right_indent') is not None: | |
| paragraph.paragraph_format.right_indent = para_format['right_indent'] | |
| if para_format.get('first_line_indent') is not None: | |
| paragraph.paragraph_format.first_line_indent = para_format['first_line_indent'] | |
| if para_format.get('space_before') is not None: | |
| paragraph.paragraph_format.space_before = para_format['space_before'] | |
| if para_format.get('space_after') is not None: | |
| paragraph.paragraph_format.space_after = para_format['space_after'] | |
| if para_format.get('line_spacing') is not None: | |
| paragraph.paragraph_format.line_spacing = para_format['line_spacing'] | |
| except Exception as e: | |
| logger.warning(f"Could not apply some paragraph formatting: {e}") | |
| # Handle run-level formatting | |
| runs_info = para_format.get('runs', []) | |
| if runs_info: | |
| # Determine dominant formatting | |
| total_runs = len(runs_info) | |
| bold_count = sum(1 for r in runs_info if r.get('bold')) | |
| italic_count = sum(1 for r in runs_info if r.get('italic')) | |
| underline_count = sum(1 for r in runs_info if r.get('underline')) | |
| # Get the most common font info | |
| font_names = [r.get('font_name') for r in runs_info if r.get('font_name')] | |
| font_sizes = [r.get('font_size') for r in runs_info if r.get('font_size')] | |
| font_colors = [r.get('font_color') for r in runs_info if r.get('font_color')] | |
| # Apply formatting to the translated text | |
| run = paragraph.add_run(para_text) | |
| # Apply dominant formatting | |
| try: | |
| if bold_count > total_runs / 2: | |
| run.bold = True | |
| if italic_count > total_runs / 2: | |
| run.italic = True | |
| if underline_count > total_runs / 2: | |
| run.underline = True | |
| # Apply most common font settings | |
| if font_names: | |
| run.font.name = max(set(font_names), key=font_names.count) | |
| if font_sizes: | |
| run.font.size = max(set(font_sizes), key=font_sizes.count) | |
| if font_colors: | |
| run.font.color.rgb = max(set(font_colors), key=font_colors.count) | |
| except Exception as e: | |
| logger.warning(f"Could not apply some formatting: {e}") | |
| else: | |
| # No run formatting info, just add the text | |
| paragraph.add_run(para_text) | |
| doc.save(filename) | |
| return filename | |
| except Exception as e: | |
| logger.error(f"Error creating formatted DOCX: {str(e)}") | |
| # Fallback to simple version | |
| return create_docx_with_text('\n\n'.join(translated_paragraphs), filename) | |
| def create_docx_with_text(text: str, filename: str) -> str: | |
| """Create a DOCX file with the given text, preserving paragraph formatting (fallback method)""" | |
| try: | |
| doc = Document() | |
| # Split text by double newlines to preserve paragraph structure | |
| paragraphs = text.split('\n\n') | |
| for para_text in paragraphs: | |
| if para_text.strip(): # Only add non-empty paragraphs | |
| # Clean up any single newlines within paragraphs and replace with spaces | |
| cleaned_text = para_text.replace('\n', ' ').strip() | |
| doc.add_paragraph(cleaned_text) | |
| doc.save(filename) | |
| return filename | |
| except Exception as e: | |
| logger.error(f"Error creating DOCX: {str(e)}") | |
| return None | |
| def translate_text_input(text: str, source_lang: str, target_lang: str, session_id: str = "") -> str: | |
| """Handle text input translation""" | |
| if not is_authenticated(session_id): | |
| return "❌ Please log in to use this feature." | |
| if not text.strip(): | |
| return "Please enter some text to translate." | |
| source_code = LANGUAGES.get(source_lang) | |
| target_code = LANGUAGES.get(target_lang) | |
| if not source_code or not target_code: | |
| return "Invalid language selection." | |
| # Allow same language (will return original text) | |
| # No need to check if source_code == target_code | |
| return translator.translate_text(text, source_code, target_code) | |
| def translate_document(file, source_lang: str, target_lang: str, session_id: str = "") -> Tuple[Optional[str], str]: | |
| """Handle document translation while preserving original formatting""" | |
| if not is_authenticated(session_id): | |
| return None, "❌ Please log in to use this feature." | |
| if file is None: | |
| return None, "Please upload a document." | |
| source_code = LANGUAGES.get(source_lang) | |
| target_code = LANGUAGES.get(target_lang) | |
| if not source_code or not target_code: | |
| return None, "Invalid language selection." | |
| # Start timing the translation | |
| start_time = time.time() | |
| try: | |
| # Get file extension | |
| file_extension = os.path.splitext(file.name)[1].lower() | |
| formatting_info = None | |
| logger.info(f"Starting document translation: {source_lang} → {target_lang}") | |
| # Extract text based on file type | |
| if file_extension == '.pdf': | |
| text = extract_text_from_pdf(file.name) | |
| elif file_extension == '.docx': | |
| text, formatting_info = extract_text_from_docx(file.name) | |
| else: | |
| return None, "Unsupported file format. Please upload PDF or DOCX files only." | |
| if text.startswith("Error"): | |
| return None, text | |
| # Log document stats | |
| word_count = len(text.split()) | |
| char_count = len(text) | |
| logger.info(f"Document stats: {word_count} words, {char_count} characters") | |
| # Translate the text | |
| translate_start = time.time() | |
| translated_text = translator.translate_text(text, source_code, target_code) | |
| translate_end = time.time() | |
| translate_duration = translate_end - translate_start | |
| logger.info(f"Core translation took: {translate_duration:.2f} seconds") | |
| # Create output file | |
| output_filename = f"translated_{os.path.splitext(os.path.basename(file.name))[0]}.docx" | |
| output_path = os.path.join(tempfile.gettempdir(), output_filename) | |
| # Create formatted output if we have formatting info | |
| if formatting_info and file_extension == '.docx': | |
| # Split translated text back into paragraphs | |
| translated_paragraphs = translated_text.split('\n\n') | |
| # Ensure we have the right number of paragraphs | |
| if len(translated_paragraphs) == len(formatting_info): | |
| create_formatted_docx(translated_paragraphs, formatting_info, output_path) | |
| else: | |
| logger.warning(f"Paragraph count mismatch: {len(translated_paragraphs)} vs {len(formatting_info)}, using fallback") | |
| create_docx_with_text(translated_text, output_path) | |
| else: | |
| # Fallback to regular formatting | |
| create_docx_with_text(translated_text, output_path) | |
| # Calculate total time | |
| end_time = time.time() | |
| total_duration = end_time - start_time | |
| # Format time display | |
| minutes = int(total_duration // 60) | |
| seconds = int(total_duration % 60) | |
| # Create detailed status message | |
| if minutes > 0: | |
| time_str = f"{minutes}m {seconds}s" | |
| else: | |
| time_str = f"{seconds}s" | |
| # Calculate translation speed (words per minute) | |
| if word_count > 0 and total_duration > 0: | |
| words_per_minute = int((word_count / total_duration) * 60) | |
| speed_info = f" • Speed: {words_per_minute} words/min" | |
| else: | |
| speed_info = "" | |
| # Determine translation type for status | |
| if source_code == target_code: | |
| translation_type = "Document processed" | |
| elif source_code == "en" or target_code == "en": | |
| translation_type = "Direct translation" | |
| else: | |
| translation_type = "Indic-to-Indic translation (via English)" | |
| status_message = ( | |
| f"✅ Translation completed successfully!\n" | |
| f"⏱️ Time taken: {time_str}\n" | |
| f"📄 Document: {word_count} words, {char_count} characters\n" | |
| f"🔄 Type: {translation_type}{speed_info}\n" | |
| f"📁 Original formatting preserved in output file." | |
| ) | |
| logger.info(f"Document translation completed in {total_duration:.2f} seconds ({time_str})") | |
| return output_path, status_message | |
| except Exception as e: | |
| end_time = time.time() | |
| total_duration = end_time - start_time | |
| minutes = int(total_duration // 60) | |
| seconds = int(total_duration % 60) | |
| time_str = f"{minutes}m {seconds}s" if minutes > 0 else f"{seconds}s" | |
| logger.error(f"Document translation error after {time_str}: {str(e)}") | |
| return None, f"❌ Error during document translation (after {time_str}): {str(e)}" | |
| # Initialize translator | |
| print("Initializing IndicTrans2 Translator with IndicTransToolkit...") | |
| translator = IndicTrans2Translator() | |
| # Create the app with proper authentication | |
| with gr.Blocks(title="IndicTrans2 Translator", theme=gr.themes.Soft()) as demo: | |
| # Session state | |
| session_state = gr.State("") | |
| # Login interface (visible by default) | |
| with gr.Column(visible=True) as login_column: | |
| gr.Markdown(""" | |
| # 🔐 IndicTrans2 Translator - Authentication Required | |
| Please enter your credentials to access the translation tool. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| pass # Empty column for centering | |
| with gr.Column(scale=2): | |
| with gr.Group(): | |
| gr.Markdown("### Login") | |
| username_input = gr.Textbox( | |
| label="Username", | |
| placeholder="Enter username", | |
| type="text" | |
| ) | |
| password_input = gr.Textbox( | |
| label="Password", | |
| placeholder="Enter password", | |
| type="password" | |
| ) | |
| login_btn = gr.Button("Login", variant="primary", size="lg") | |
| login_status = gr.Markdown("") | |
| with gr.Column(scale=1): | |
| pass # Empty column for centering | |
| gr.Markdown(""" | |
| --- | |
| **For Administrators:** | |
| - Set environment secrets `USERNAME` and `PASSWORD` to configure credentials | |
| - Secrets are encrypted and secure in HuggingFace Spaces | |
| **Features:** | |
| - 🔒 Secure authentication system | |
| - 🌍 Support for 22+ Indian languages | |
| - 📄 Document translation with formatting preservation | |
| - 🔥 High-quality translation using IndicTrans2 models | |
| """) | |
| # Main translator interface (hidden by default) | |
| with gr.Column(visible=False) as main_column: | |
| gr.Markdown(""" | |
| # IndicTrans2 Translation Tool | |
| Translate text between English and Indian languages using the IndicTrans2 1B model with IndicTransToolkit for optimal quality. | |
| """) | |
| with gr.Tabs(): | |
| # Text Translation Tab | |
| with gr.TabItem("Text Translation"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| text_input = gr.Textbox( | |
| label="Input Text", | |
| placeholder="Enter text to translate...", | |
| lines=5 | |
| ) | |
| with gr.Row(): | |
| source_lang_text = gr.Dropdown( | |
| choices=list(LANGUAGES.keys()), | |
| label="Source Language", | |
| value="English" | |
| ) | |
| target_lang_text = gr.Dropdown( | |
| choices=list(LANGUAGES.keys()), | |
| label="Target Language", | |
| value="Hindi" | |
| ) | |
| translate_text_btn = gr.Button("Translate Text", variant="primary") | |
| with gr.Column(): | |
| text_output = gr.Textbox( | |
| label="Translated Text", | |
| lines=5, | |
| interactive=False | |
| ) | |
| # Document Translation Tab | |
| with gr.TabItem("Document Translation"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File( | |
| label="Upload Document", | |
| file_types=[".pdf", ".docx"], | |
| type="filepath" | |
| ) | |
| with gr.Row(): | |
| source_lang_doc = gr.Dropdown( | |
| choices=list(LANGUAGES.keys()), | |
| label="Source Language", | |
| value="English" | |
| ) | |
| target_lang_doc = gr.Dropdown( | |
| choices=list(LANGUAGES.keys()), | |
| label="Target Language", | |
| value="Hindi" | |
| ) | |
| translate_doc_btn = gr.Button("Translate Document", variant="primary") | |
| with gr.Column(): | |
| doc_status = gr.Textbox( | |
| label="Status", | |
| interactive=False | |
| ) | |
| doc_output = gr.File( | |
| label="Download Translated Document" | |
| ) | |
| # Examples | |
| gr.Examples( | |
| examples=[ | |
| ["Hello, how are you?", "English", "Hindi"], | |
| ["This is a test sentence for translation.", "English", "Bengali"], | |
| ["Machine learning is changing the world.", "English", "Tamil"], | |
| ["नमस्ते, आप कैसे हैं?", "Hindi", "English"], | |
| ["আমি ভালো আছি।", "Bengali", "Hindi"], | |
| ["मला खूप आनंद झाला।", "Marathi", "Tamil"], | |
| ["ನಾನು ಚೆನ್ನಾಗಿದ್ದೇನೆ।", "Kannada", "Telugu"] | |
| ], | |
| inputs=[text_input, source_lang_text, target_lang_text], | |
| outputs=[text_output], | |
| fn=lambda text, src, tgt: translate_text_input(text, src, tgt, ""), | |
| cache_examples=False | |
| ) | |
| # Logout functionality | |
| with gr.Row(): | |
| logout_btn = gr.Button("🔓 Logout", variant="secondary", size="sm") | |
| def handle_login(username, password): | |
| success, session_id = authenticate(username, password) | |
| if success: | |
| return ( | |
| gr.Markdown("✅ **Login successful!** Welcome to the translator."), | |
| gr.Column(visible=False), | |
| gr.Column(visible=True), | |
| session_id | |
| ) | |
| else: | |
| return ( | |
| gr.Markdown("❌ **Invalid credentials.** Please try again."), | |
| gr.Column(visible=True), | |
| gr.Column(visible=False), | |
| "" | |
| ) | |
| def handle_logout(session_id): | |
| if session_id: | |
| logout_session(session_id) | |
| return ( | |
| gr.Column(visible=True), | |
| gr.Column(visible=False), | |
| "", | |
| gr.Textbox(value=""), | |
| gr.Textbox(value=""), | |
| gr.Markdown("🔓 **Logged out successfully.** Please login again.") | |
| ) | |
| # Event handlers | |
| login_btn.click( | |
| fn=handle_login, | |
| inputs=[username_input, password_input], | |
| outputs=[login_status, login_column, main_column, session_state] | |
| ) | |
| logout_btn.click( | |
| fn=handle_logout, | |
| inputs=[session_state], | |
| outputs=[login_column, main_column, session_state, username_input, password_input, login_status] | |
| ) | |
| translate_text_btn.click( | |
| fn=lambda text, src, tgt, session: translate_text_input(text, src, tgt, session), | |
| inputs=[text_input, source_lang_text, target_lang_text, session_state], | |
| outputs=[text_output] | |
| ) | |
| translate_doc_btn.click( | |
| fn=lambda file, src, tgt, session: translate_document(file, src, tgt, session), | |
| inputs=[file_input, source_lang_doc, target_lang_doc, session_state], | |
| outputs=[doc_output, doc_status] | |
| ) | |
| print("IndicTrans2 Translator with Authentication initialized successfully!") | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch(share=True) |