Anupam251272's picture
Create app.py
45a5021 verified
import gradio as gr
import PyPDF2
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from deep_translator import GoogleTranslator # More stable than googletrans
import logging
from typing import Optional, Dict
import time
from pathlib import Path
import os
import pandas as pd
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Language mapping with detailed descriptions
LANGUAGE_MAPPING = {
"hi": {
"name": "Hindi - हिन्दी",
"description": "Official language of India, written in Devanagari script",
"deep_translator_code": "hi"
},
"ta": {
"name": "Tamil - தமிழ்",
"description": "Classical language of Tamil Nadu, written in Tamil script",
"deep_translator_code": "ta"
},
"te": {
"name": "Telugu - తెలుగు",
"description": "Official language of Andhra Pradesh and Telangana",
"deep_translator_code": "te"
},
"bn": {
"name": "Bengali - বাংলা",
"description": "Official language of West Bengal and Bangladesh",
"deep_translator_code": "bn"
},
"mr": {
"name": "Marathi - मराठी",
"description": "Official language of Maharashtra",
"deep_translator_code": "mr"
}
}
class FileQueryTranslator:
def __init__(self, max_retries=3, retry_delay=1):
self.max_retries = max_retries
self.retry_delay = retry_delay
self.setup_device()
self.setup_model()
logger.info(f"Initialization complete. Using device: {self.device}")
def setup_device(self):
"""Setup CUDA device with error handling"""
try:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if self.device.type == "cuda":
# Check CUDA memory
torch.cuda.empty_cache()
logger.info(f"Available CUDA memory: {torch.cuda.get_device_properties(0).total_memory}")
except Exception as e:
logger.warning(f"Error setting up CUDA device: {e}. Falling back to CPU.")
self.device = torch.device("cpu")
def setup_model(self):
"""Initialize the model with retry mechanism"""
for attempt in range(self.max_retries):
try:
model_name = "facebook/opt-125m" # Using smaller model for stability
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32
)
if self.device.type == "cuda":
self.model = self.model.to(self.device)
torch.cuda.empty_cache() # Clear CUDA cache
else:
self.model = self.model.to(self.device)
logger.info(f"Model loaded successfully on {self.device}")
break
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
else:
raise Exception("Failed to load model after maximum retries")
def extract_text_from_pdf(self, pdf_file: str) -> str:
"""Extract text from PDF with robust error handling"""
try:
if not os.path.exists(pdf_file):
raise FileNotFoundError(f"PDF file not found: {pdf_file}")
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = []
for page_num in range(len(pdf_reader.pages)):
try:
page = pdf_reader.pages[page_num]
text.append(page.extract_text())
except Exception as e:
logger.error(f"Error extracting text from page {page_num}: {e}")
text.append(f"[Error extracting page {page_num}]")
return "\n".join(text)
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
return f"Error processing PDF: {str(e)}"
def extract_text_from_csv(self, csv_file: str) -> str:
"""Extract text from CSV with robust error handling"""
try:
if not os.path.exists(csv_file):
raise FileNotFoundError(f"CSV file not found: {csv_file}")
df = pd.read_csv(csv_file)
text = df.to_string(index=False)
return text
except Exception as e:
logger.error(f"Error processing CSV: {str(e)}")
return f"Error processing CSV: {str(e)}"
def extract_text_from_xlsx(self, xlsx_file: str) -> str:
"""Extract text from XLSX with robust error handling"""
try:
if not os.path.exists(xlsx_file):
raise FileNotFoundError(f"XLSX file not found: {xlsx_file}")
df = pd.read_excel(xlsx_file)
text = df.to_string(index=False)
return text
except Exception as e:
logger.error(f"Error processing XLSX: {str(e)}")
return f"Error processing XLSX: {str(e)}"
def translate_text(self, text: str, target_lang: str) -> str:
"""Translate text using deep_translator with retry mechanism"""
for attempt in range(self.max_retries):
try:
translator = GoogleTranslator(source='auto', target=target_lang)
# Split text into chunks if it's too long (Google Translate limit)
max_chunk_size = 4500
chunks = [text[i:i + max_chunk_size] for i in range(0, len(text), max_chunk_size)]
translated_chunks = []
for chunk in chunks:
translated_chunk = translator.translate(chunk)
translated_chunks.append(translated_chunk)
time.sleep(0.5) # Rate limiting
return ' '.join(translated_chunks)
except Exception as e:
logger.error(f"Translation attempt {attempt + 1} failed: {str(e)}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
else:
return f"Translation error: {str(e)}"
def process_query(self, file_path: str, file_type: str, query: str, language: str) -> str:
"""Process query with comprehensive error handling"""
try:
# Validate inputs
if not file_path or not os.path.exists(file_path):
return "Please provide a valid file."
if not query.strip():
return "Please provide a valid query."
if language not in LANGUAGE_MAPPING:
return "Please select a valid language."
# Extract text based on file type
if file_type == "pdf":
file_text = self.extract_text_from_pdf(file_path)
elif file_type == "csv":
file_text = self.extract_text_from_csv(file_path)
elif file_type == "xlsx":
file_text = self.extract_text_from_xlsx(file_path)
else:
return "Unsupported file type."
if file_text.startswith("Error"):
return file_text
# Generate response
prompt = f"Query: {query}\n\nContent: {file_text[:1000]}\n\nAnswer:" # Limit content length
input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids.to(self.device)
with torch.no_grad():
output = self.model.generate(
input_ids,
max_new_tokens=200, # Use max_new_tokens instead of max_length
num_return_sequences=1,
temperature=0.7,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(output[0], skip_special_tokens=True)
# Translate
target_lang = LANGUAGE_MAPPING[language]["deep_translator_code"]
translated_response = self.translate_text(response, target_lang)
return translated_response
except Exception as e:
logger.error(f"Error in process_query: {str(e)}")
return f"An error occurred: {str(e)}"
# Gradio interface
def create_interface():
file_processor = FileQueryTranslator()
with gr.Blocks() as demo:
gr.Markdown("### File Query and Translation System")
with gr.Row():
with gr.Column():
file_input = gr.File(
label="Upload File (PDF, CSV, XLSX)",
type="filepath"
)
file_type_input = gr.Radio(
label="Select File Type",
choices=["pdf", "csv", "xlsx"],
value="pdf"
)
query_input = gr.Textbox(
label="Enter your question about the file",
placeholder="What would you like to know about the document?"
)
language_input = gr.Dropdown(
label="Select Output Language",
choices=[f"{code} - {info['name']}" for code, info in LANGUAGE_MAPPING.items()],
value="hi - Hindi - हिन्दी"
)
language_description = gr.Textbox(
label="Language Information",
value=LANGUAGE_MAPPING['hi']['description'],
interactive=False
)
with gr.Row():
output_text = gr.Textbox(
label="Translated Answer",
placeholder="Translation will appear here...",
lines=5
)
def update_description(selected):
code = selected.split(" - ")[0]
return LANGUAGE_MAPPING[code]['description']
def process_and_translate(file_path, file_type, query, language):
try:
lang_code = language.split(" - ")[0]
return file_processor.process_query(file_path, file_type, query, lang_code)
except Exception as e:
return f"Error processing request: {str(e)}"
# Event handlers
language_input.change(
fn=update_description,
inputs=[language_input],
outputs=[language_description]
)
submit_button = gr.Button("Get Answer")
submit_button.click(
fn=process_and_translate,
inputs=[file_input, file_type_input, query_input, language_input],
outputs=output_text
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.queue() # Enable queueing
demo.launch(share=True)