Spaces:
Sleeping
Sleeping
import gradio as gr | |
import PyPDF2 | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from deep_translator import GoogleTranslator # More stable than googletrans | |
import logging | |
from typing import Optional, Dict | |
import time | |
from pathlib import Path | |
import os | |
import pandas as pd | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Language mapping with detailed descriptions | |
LANGUAGE_MAPPING = { | |
"hi": { | |
"name": "Hindi - हिन्दी", | |
"description": "Official language of India, written in Devanagari script", | |
"deep_translator_code": "hi" | |
}, | |
"ta": { | |
"name": "Tamil - தமிழ்", | |
"description": "Classical language of Tamil Nadu, written in Tamil script", | |
"deep_translator_code": "ta" | |
}, | |
"te": { | |
"name": "Telugu - తెలుగు", | |
"description": "Official language of Andhra Pradesh and Telangana", | |
"deep_translator_code": "te" | |
}, | |
"bn": { | |
"name": "Bengali - বাংলা", | |
"description": "Official language of West Bengal and Bangladesh", | |
"deep_translator_code": "bn" | |
}, | |
"mr": { | |
"name": "Marathi - मराठी", | |
"description": "Official language of Maharashtra", | |
"deep_translator_code": "mr" | |
} | |
} | |
class FileQueryTranslator: | |
def __init__(self, max_retries=3, retry_delay=1): | |
self.max_retries = max_retries | |
self.retry_delay = retry_delay | |
self.setup_device() | |
self.setup_model() | |
logger.info(f"Initialization complete. Using device: {self.device}") | |
def setup_device(self): | |
"""Setup CUDA device with error handling""" | |
try: | |
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
if self.device.type == "cuda": | |
# Check CUDA memory | |
torch.cuda.empty_cache() | |
logger.info(f"Available CUDA memory: {torch.cuda.get_device_properties(0).total_memory}") | |
except Exception as e: | |
logger.warning(f"Error setting up CUDA device: {e}. Falling back to CPU.") | |
self.device = torch.device("cpu") | |
def setup_model(self): | |
"""Initialize the model with retry mechanism""" | |
for attempt in range(self.max_retries): | |
try: | |
model_name = "facebook/opt-125m" # Using smaller model for stability | |
self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
self.model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32 | |
) | |
if self.device.type == "cuda": | |
self.model = self.model.to(self.device) | |
torch.cuda.empty_cache() # Clear CUDA cache | |
else: | |
self.model = self.model.to(self.device) | |
logger.info(f"Model loaded successfully on {self.device}") | |
break | |
except Exception as e: | |
logger.error(f"Attempt {attempt + 1} failed: {str(e)}") | |
if attempt < self.max_retries - 1: | |
time.sleep(self.retry_delay) | |
else: | |
raise Exception("Failed to load model after maximum retries") | |
def extract_text_from_pdf(self, pdf_file: str) -> str: | |
"""Extract text from PDF with robust error handling""" | |
try: | |
if not os.path.exists(pdf_file): | |
raise FileNotFoundError(f"PDF file not found: {pdf_file}") | |
pdf_reader = PyPDF2.PdfReader(pdf_file) | |
text = [] | |
for page_num in range(len(pdf_reader.pages)): | |
try: | |
page = pdf_reader.pages[page_num] | |
text.append(page.extract_text()) | |
except Exception as e: | |
logger.error(f"Error extracting text from page {page_num}: {e}") | |
text.append(f"[Error extracting page {page_num}]") | |
return "\n".join(text) | |
except Exception as e: | |
logger.error(f"Error processing PDF: {str(e)}") | |
return f"Error processing PDF: {str(e)}" | |
def extract_text_from_csv(self, csv_file: str) -> str: | |
"""Extract text from CSV with robust error handling""" | |
try: | |
if not os.path.exists(csv_file): | |
raise FileNotFoundError(f"CSV file not found: {csv_file}") | |
df = pd.read_csv(csv_file) | |
text = df.to_string(index=False) | |
return text | |
except Exception as e: | |
logger.error(f"Error processing CSV: {str(e)}") | |
return f"Error processing CSV: {str(e)}" | |
def extract_text_from_xlsx(self, xlsx_file: str) -> str: | |
"""Extract text from XLSX with robust error handling""" | |
try: | |
if not os.path.exists(xlsx_file): | |
raise FileNotFoundError(f"XLSX file not found: {xlsx_file}") | |
df = pd.read_excel(xlsx_file) | |
text = df.to_string(index=False) | |
return text | |
except Exception as e: | |
logger.error(f"Error processing XLSX: {str(e)}") | |
return f"Error processing XLSX: {str(e)}" | |
def translate_text(self, text: str, target_lang: str) -> str: | |
"""Translate text using deep_translator with retry mechanism""" | |
for attempt in range(self.max_retries): | |
try: | |
translator = GoogleTranslator(source='auto', target=target_lang) | |
# Split text into chunks if it's too long (Google Translate limit) | |
max_chunk_size = 4500 | |
chunks = [text[i:i + max_chunk_size] for i in range(0, len(text), max_chunk_size)] | |
translated_chunks = [] | |
for chunk in chunks: | |
translated_chunk = translator.translate(chunk) | |
translated_chunks.append(translated_chunk) | |
time.sleep(0.5) # Rate limiting | |
return ' '.join(translated_chunks) | |
except Exception as e: | |
logger.error(f"Translation attempt {attempt + 1} failed: {str(e)}") | |
if attempt < self.max_retries - 1: | |
time.sleep(self.retry_delay) | |
else: | |
return f"Translation error: {str(e)}" | |
def process_query(self, file_path: str, file_type: str, query: str, language: str) -> str: | |
"""Process query with comprehensive error handling""" | |
try: | |
# Validate inputs | |
if not file_path or not os.path.exists(file_path): | |
return "Please provide a valid file." | |
if not query.strip(): | |
return "Please provide a valid query." | |
if language not in LANGUAGE_MAPPING: | |
return "Please select a valid language." | |
# Extract text based on file type | |
if file_type == "pdf": | |
file_text = self.extract_text_from_pdf(file_path) | |
elif file_type == "csv": | |
file_text = self.extract_text_from_csv(file_path) | |
elif file_type == "xlsx": | |
file_text = self.extract_text_from_xlsx(file_path) | |
else: | |
return "Unsupported file type." | |
if file_text.startswith("Error"): | |
return file_text | |
# Generate response | |
prompt = f"Query: {query}\n\nContent: {file_text[:1000]}\n\nAnswer:" # Limit content length | |
input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids.to(self.device) | |
with torch.no_grad(): | |
output = self.model.generate( | |
input_ids, | |
max_new_tokens=200, # Use max_new_tokens instead of max_length | |
num_return_sequences=1, | |
temperature=0.7, | |
pad_token_id=self.tokenizer.eos_token_id | |
) | |
response = self.tokenizer.decode(output[0], skip_special_tokens=True) | |
# Translate | |
target_lang = LANGUAGE_MAPPING[language]["deep_translator_code"] | |
translated_response = self.translate_text(response, target_lang) | |
return translated_response | |
except Exception as e: | |
logger.error(f"Error in process_query: {str(e)}") | |
return f"An error occurred: {str(e)}" | |
# Gradio interface | |
def create_interface(): | |
file_processor = FileQueryTranslator() | |
with gr.Blocks() as demo: | |
gr.Markdown("### File Query and Translation System") | |
with gr.Row(): | |
with gr.Column(): | |
file_input = gr.File( | |
label="Upload File (PDF, CSV, XLSX)", | |
type="filepath" | |
) | |
file_type_input = gr.Radio( | |
label="Select File Type", | |
choices=["pdf", "csv", "xlsx"], | |
value="pdf" | |
) | |
query_input = gr.Textbox( | |
label="Enter your question about the file", | |
placeholder="What would you like to know about the document?" | |
) | |
language_input = gr.Dropdown( | |
label="Select Output Language", | |
choices=[f"{code} - {info['name']}" for code, info in LANGUAGE_MAPPING.items()], | |
value="hi - Hindi - हिन्दी" | |
) | |
language_description = gr.Textbox( | |
label="Language Information", | |
value=LANGUAGE_MAPPING['hi']['description'], | |
interactive=False | |
) | |
with gr.Row(): | |
output_text = gr.Textbox( | |
label="Translated Answer", | |
placeholder="Translation will appear here...", | |
lines=5 | |
) | |
def update_description(selected): | |
code = selected.split(" - ")[0] | |
return LANGUAGE_MAPPING[code]['description'] | |
def process_and_translate(file_path, file_type, query, language): | |
try: | |
lang_code = language.split(" - ")[0] | |
return file_processor.process_query(file_path, file_type, query, lang_code) | |
except Exception as e: | |
return f"Error processing request: {str(e)}" | |
# Event handlers | |
language_input.change( | |
fn=update_description, | |
inputs=[language_input], | |
outputs=[language_description] | |
) | |
submit_button = gr.Button("Get Answer") | |
submit_button.click( | |
fn=process_and_translate, | |
inputs=[file_input, file_type_input, query_input, language_input], | |
outputs=output_text | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.queue() # Enable queueing | |
demo.launch(share=True) |