Mgolo commited on
Commit
5ce9ccf
·
verified ·
1 Parent(s): eafa517

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +100 -158
app.py CHANGED
@@ -1,8 +1,10 @@
1
  """
2
  LocaleNLP Translation Service
3
  ============================
 
4
  A multi-language translation application supporting English, Wolof, Hausa, and Darija.
5
  Features text, audio, and document translation with automatic chaining for all language pairs.
 
6
  Author: LocaleNLP
7
  """
8
 
@@ -10,7 +12,7 @@ import os
10
  import re
11
  import logging
12
  import tempfile
13
- from typing import Optional, Dict, Tuple, Any, Union, List
14
  from pathlib import Path
15
  from dataclasses import dataclass
16
  from enum import Enum
@@ -131,33 +133,26 @@ class ModelManager:
131
 
132
  # Authenticate with Hugging Face if token provided
133
  if hf_token := os.getenv("hffff"):
134
- try:
135
- login(token=hf_token)
136
- except Exception as e:
137
- logger.warning(f"HF login failed: {e}")
138
 
139
- try:
140
- model = AutoModelForSeq2SeqLM.from_pretrained(
141
- config.model_name,
142
- token=hf_token if hf_token else None
143
- ).to(self._get_device())
144
-
145
- tokenizer = MarianTokenizer.from_pretrained(
146
- config.model_name,
147
- token=hf_token if hf_token else None
148
- )
149
-
150
- self._translation_pipeline = pipeline(
151
- "translation",
152
- model=model,
153
- tokenizer=tokenizer,
154
- device=0 if self._get_device().type == "cuda" else -1
155
- )
156
-
157
- self._current_model_name = config.model_name
158
- except Exception as e:
159
- logger.error(f"Failed to load model {config.model_name}: {e}")
160
- raise
161
 
162
  return self._translation_pipeline, config.language_tag
163
 
@@ -170,11 +165,7 @@ class ModelManager:
170
  """
171
  if self._whisper_model is None:
172
  logger.info("Loading Whisper base model...")
173
- try:
174
- self._whisper_model = whisper.load_model("base")
175
- except Exception as e:
176
- logger.error(f"Failed to load Whisper model: {e}")
177
- raise
178
  return self._whisper_model
179
 
180
  def _get_device(self) -> torch.device:
@@ -207,18 +198,20 @@ class ContentProcessor:
207
  extension = file_path.suffix.lower()
208
 
209
  try:
 
 
210
  if extension == ".pdf":
211
- return ContentProcessor._extract_pdf_text(file_path)
212
  elif extension == ".docx":
213
  return ContentProcessor._extract_docx_text(file_path)
214
  elif extension in (".html", ".htm"):
215
- return ContentProcessor._extract_html_text(file_path)
216
  elif extension == ".md":
217
- return ContentProcessor._extract_markdown_text(file_path)
218
  elif extension == ".srt":
219
- return ContentProcessor._extract_srt_text(file_path)
220
  elif extension in (".txt", ".text"):
221
- return ContentProcessor._extract_plain_text(file_path)
222
  else:
223
  raise ValueError(f"Unsupported file type: {extension}")
224
 
@@ -227,30 +220,28 @@ class ContentProcessor:
227
  raise
228
 
229
  @staticmethod
230
- def _extract_pdf_text(file_path: Path) -> str:
231
  """Extract text from PDF file."""
232
- with fitz.open(file_path) as doc:
233
  return "\n".join(page.get_text() for page in doc)
234
 
235
  @staticmethod
236
  def _extract_docx_text(file_path: Path) -> str:
237
  """Extract text from DOCX file."""
238
- doc = docx.Document(file_path)
239
  return "\n".join(paragraph.text for paragraph in doc.paragraphs)
240
 
241
  @staticmethod
242
- def _extract_html_text(file_path: Path) -> str:
243
  """Extract text from HTML file."""
244
- content = file_path.read_bytes()
245
  encoding = chardet.detect(content)["encoding"] or "utf-8"
246
  text = content.decode(encoding, errors="ignore")
247
  soup = BeautifulSoup(text, "html.parser")
248
  return soup.get_text()
249
 
250
  @staticmethod
251
- def _extract_markdown_text(file_path: Path) -> str:
252
  """Extract text from Markdown file."""
253
- content = file_path.read_bytes()
254
  encoding = chardet.detect(content)["encoding"] or "utf-8"
255
  text = content.decode(encoding, errors="ignore")
256
  html = markdown(text)
@@ -258,18 +249,16 @@ class ContentProcessor:
258
  return soup.get_text()
259
 
260
  @staticmethod
261
- def _extract_srt_text(file_path: Path) -> str:
262
  """Extract text from SRT subtitle file."""
263
- content = file_path.read_bytes()
264
  encoding = chardet.detect(content)["encoding"] or "utf-8"
265
  text = content.decode(encoding, errors="ignore")
266
  # Remove timestamp lines
267
  return re.sub(r"\d+\n\d{2}:\d{2}:\d{2},\d{3} --> .*?\n", "", text)
268
 
269
  @staticmethod
270
- def _extract_plain_text(file_path: Path) -> str:
271
  """Extract text from plain text file."""
272
- content = file_path.read_bytes()
273
  encoding = chardet.detect(content)["encoding"] or "utf-8"
274
  return content.decode(encoding, errors="ignore")
275
 
@@ -317,15 +306,11 @@ class TranslationService:
317
  target_lang: Language
318
  ) -> str:
319
  """Perform direct translation using available model."""
320
- try:
321
- pipeline_obj, lang_tag = self.model_manager.get_translation_pipeline(
322
- source_lang, target_lang
323
- )
324
-
325
- return self._process_text_with_pipeline(text, pipeline_obj, lang_tag)
326
- except Exception as e:
327
- logger.error(f"Direct translation error: {e}")
328
- return f"Translation error: {str(e)}"
329
 
330
  def _chained_translate(
331
  self,
@@ -344,21 +329,17 @@ class TranslationService:
344
  Returns:
345
  Translated text through chaining
346
  """
347
- try:
348
- # First: source_lang -> English
349
- intermediate_text = self._direct_translate(
350
- text, source_lang, Language.ENGLISH
351
- )
352
-
353
- # Second: English -> target_lang
354
- final_text = self._direct_translate(
355
- intermediate_text, Language.ENGLISH, target_lang
356
- )
357
-
358
- return final_text
359
- except Exception as e:
360
- logger.error(f"Chained translation error: {e}")
361
- return f"Chained translation error: {str(e)}"
362
 
363
  def _process_text_with_pipeline(
364
  self,
@@ -383,38 +364,30 @@ class TranslationService:
383
  if s.strip()
384
  ]
385
 
386
- if not sentences:
387
- translated_paragraphs.append("")
388
- continue
389
-
390
  # Add language tag to each sentence
391
  formatted_sentences = [
392
  f"{lang_tag} {sentence}"
393
  for sentence in sentences
394
  ]
395
 
396
- try:
397
- # Perform translation
398
- results = pipeline_obj(
399
- formatted_sentences,
400
- max_length=5000,
401
- num_beams=5,
402
- early_stopping=True,
403
- no_repeat_ngram_size=3,
404
- repetition_penalty=1.5,
405
- length_penalty=1.2
406
- )
407
-
408
- # Process results
409
- translated_sentences = [
410
- result["translation_text"].capitalize()
411
- for result in results
412
- ]
413
-
414
- translated_paragraphs.append(". ".join(translated_sentences))
415
- except Exception as e:
416
- logger.error(f"Pipeline processing error: {e}")
417
- translated_paragraphs.append(f"[Translation Error: {str(e)}]")
418
 
419
  return "\n".join(translated_paragraphs)
420
 
@@ -438,13 +411,9 @@ class AudioProcessor:
438
  Returns:
439
  Transcribed text
440
  """
441
- try:
442
- model = self.model_manager.get_whisper_model()
443
- result = model.transcribe(audio_file_path)
444
- return result["text"]
445
- except Exception as e:
446
- logger.error(f"Transcription error: {e}")
447
- return f"Transcription error: {str(e)}"
448
 
449
  # ================================
450
  # Main Application
@@ -465,7 +434,7 @@ class TranslationApp:
465
  source_lang: Language,
466
  text_input: str,
467
  audio_file: Optional[str],
468
- file_obj: Optional[Any]
469
  ) -> str:
470
  """
471
  Process input based on selected mode.
@@ -480,29 +449,22 @@ class TranslationApp:
480
  Returns:
481
  Processed text content
482
  """
483
- try:
484
- if mode == InputMode.TEXT:
485
- return text_input
486
-
487
- elif mode == InputMode.AUDIO:
488
- if source_lang != Language.ENGLISH:
489
- return "Audio input must be in English."
490
- if not audio_file:
491
- return "No audio file provided."
492
- return self.audio_processor.transcribe(audio_file)
493
-
494
- elif mode == InputMode.FILE:
495
- if not file_obj:
496
- return "No file uploaded."
497
-
498
- # Handle Gradio file object (could be a string path or a file-like object)
499
- file_path = file_obj.name if hasattr(file_obj, 'name') else file_obj
500
- return self.content_processor.extract_text_from_file(file_path)
501
-
502
- return ""
503
- except Exception as e:
504
- logger.error(f"Input processing error: {e}")
505
- return f"Input processing error: {str(e)}"
506
 
507
  def create_interface(self) -> gr.Blocks:
508
  """Create and return the Gradio interface."""
@@ -511,22 +473,6 @@ class TranslationApp:
511
  title="LocaleNLP Translation Service",
512
  theme=gr.themes.Monochrome()
513
  ) as interface:
514
- # Custom CSS for black button
515
- gr.HTML("""
516
- <style>
517
- .gr-button-secondary {
518
- background-color: #000000 !important;
519
- border-color: #000000 !important;
520
- color: white !important;
521
- }
522
- .gr-button-secondary:hover {
523
- background-color: #333333 !important;
524
- border-color: #333333 !important;
525
- color: white !important;
526
- }
527
- </style>
528
- """)
529
-
530
  # Header
531
  gr.Markdown("""
532
  # 🌍 LocaleNLP Translation Service
@@ -592,26 +538,22 @@ class TranslationApp:
592
  )
593
 
594
  # Event handlers
595
- def update_visibility(mode: str) -> List[Dict[str, Any]]:
596
  """Update component visibility based on input mode."""
597
- visibility_text = mode == InputMode.TEXT.value
598
- visibility_audio = mode == InputMode.AUDIO.value
599
- visibility_file = mode == InputMode.FILE.value
600
-
601
- return [
602
- gr.update(visible=visibility_text),
603
- gr.update(visible=visibility_audio),
604
- gr.update(visible=visibility_file),
605
- gr.update(value="", visible=True),
606
- gr.update(value="")
607
- ]
608
 
609
  def handle_process(
610
  mode: str,
611
  source_lang: str,
612
  text_input: str,
613
  audio_file: Optional[str],
614
- file_obj: Optional[Any]
615
  ) -> Tuple[str, str]:
616
  """Handle initial input processing."""
617
  try:
 
1
  """
2
  LocaleNLP Translation Service
3
  ============================
4
+
5
  A multi-language translation application supporting English, Wolof, Hausa, and Darija.
6
  Features text, audio, and document translation with automatic chaining for all language pairs.
7
+
8
  Author: LocaleNLP
9
  """
10
 
 
12
  import re
13
  import logging
14
  import tempfile
15
+ from typing import Optional, Dict, Tuple, Any, Union
16
  from pathlib import Path
17
  from dataclasses import dataclass
18
  from enum import Enum
 
133
 
134
  # Authenticate with Hugging Face if token provided
135
  if hf_token := os.getenv("hffff"):
136
+ login(token=hf_token)
 
 
 
137
 
138
+ model = AutoModelForSeq2SeqLM.from_pretrained(
139
+ config.model_name,
140
+ token=hf_token
141
+ ).to(self._get_device())
142
+
143
+ tokenizer = MarianTokenizer.from_pretrained(
144
+ config.model_name,
145
+ token=hf_token
146
+ )
147
+
148
+ self._translation_pipeline = pipeline(
149
+ "translation",
150
+ model=model,
151
+ tokenizer=tokenizer,
152
+ device=0 if self._get_device().type == "cuda" else -1
153
+ )
154
+
155
+ self._current_model_name = config.model_name
 
 
 
 
156
 
157
  return self._translation_pipeline, config.language_tag
158
 
 
165
  """
166
  if self._whisper_model is None:
167
  logger.info("Loading Whisper base model...")
168
+ self._whisper_model = whisper.load_model("base")
 
 
 
 
169
  return self._whisper_model
170
 
171
  def _get_device(self) -> torch.device:
 
198
  extension = file_path.suffix.lower()
199
 
200
  try:
201
+ content = file_path.read_bytes()
202
+
203
  if extension == ".pdf":
204
+ return ContentProcessor._extract_pdf_text(content)
205
  elif extension == ".docx":
206
  return ContentProcessor._extract_docx_text(file_path)
207
  elif extension in (".html", ".htm"):
208
+ return ContentProcessor._extract_html_text(content)
209
  elif extension == ".md":
210
+ return ContentProcessor._extract_markdown_text(content)
211
  elif extension == ".srt":
212
+ return ContentProcessor._extract_srt_text(content)
213
  elif extension in (".txt", ".text"):
214
+ return ContentProcessor._extract_plain_text(content)
215
  else:
216
  raise ValueError(f"Unsupported file type: {extension}")
217
 
 
220
  raise
221
 
222
  @staticmethod
223
+ def _extract_pdf_text(content: bytes) -> str:
224
  """Extract text from PDF file."""
225
+ with fitz.open(stream=content, filetype="pdf") as doc:
226
  return "\n".join(page.get_text() for page in doc)
227
 
228
  @staticmethod
229
  def _extract_docx_text(file_path: Path) -> str:
230
  """Extract text from DOCX file."""
231
+ doc = docx.Document(str(file_path))
232
  return "\n".join(paragraph.text for paragraph in doc.paragraphs)
233
 
234
  @staticmethod
235
+ def _extract_html_text(content: bytes) -> str:
236
  """Extract text from HTML file."""
 
237
  encoding = chardet.detect(content)["encoding"] or "utf-8"
238
  text = content.decode(encoding, errors="ignore")
239
  soup = BeautifulSoup(text, "html.parser")
240
  return soup.get_text()
241
 
242
  @staticmethod
243
+ def _extract_markdown_text(content: bytes) -> str:
244
  """Extract text from Markdown file."""
 
245
  encoding = chardet.detect(content)["encoding"] or "utf-8"
246
  text = content.decode(encoding, errors="ignore")
247
  html = markdown(text)
 
249
  return soup.get_text()
250
 
251
  @staticmethod
252
+ def _extract_srt_text(content: bytes) -> str:
253
  """Extract text from SRT subtitle file."""
 
254
  encoding = chardet.detect(content)["encoding"] or "utf-8"
255
  text = content.decode(encoding, errors="ignore")
256
  # Remove timestamp lines
257
  return re.sub(r"\d+\n\d{2}:\d{2}:\d{2},\d{3} --> .*?\n", "", text)
258
 
259
  @staticmethod
260
+ def _extract_plain_text(content: bytes) -> str:
261
  """Extract text from plain text file."""
 
262
  encoding = chardet.detect(content)["encoding"] or "utf-8"
263
  return content.decode(encoding, errors="ignore")
264
 
 
306
  target_lang: Language
307
  ) -> str:
308
  """Perform direct translation using available model."""
309
+ pipeline_obj, lang_tag = self.model_manager.get_translation_pipeline(
310
+ source_lang, target_lang
311
+ )
312
+
313
+ return self._process_text_with_pipeline(text, pipeline_obj, lang_tag)
 
 
 
 
314
 
315
  def _chained_translate(
316
  self,
 
329
  Returns:
330
  Translated text through chaining
331
  """
332
+ # First: source_lang -> English
333
+ intermediate_text = self._direct_translate(
334
+ text, source_lang, Language.ENGLISH
335
+ )
336
+
337
+ # Second: English -> target_lang
338
+ final_text = self._direct_translate(
339
+ intermediate_text, Language.ENGLISH, target_lang
340
+ )
341
+
342
+ return final_text
 
 
 
 
343
 
344
  def _process_text_with_pipeline(
345
  self,
 
364
  if s.strip()
365
  ]
366
 
 
 
 
 
367
  # Add language tag to each sentence
368
  formatted_sentences = [
369
  f"{lang_tag} {sentence}"
370
  for sentence in sentences
371
  ]
372
 
373
+ # Perform translation
374
+ results = pipeline_obj(
375
+ formatted_sentences,
376
+ max_length=5000,
377
+ num_beams=5,
378
+ early_stopping=True,
379
+ no_repeat_ngram_size=3,
380
+ repetition_penalty=1.5,
381
+ length_penalty=1.2
382
+ )
383
+
384
+ # Process results
385
+ translated_sentences = [
386
+ result["translation_text"].capitalize()
387
+ for result in results
388
+ ]
389
+
390
+ translated_paragraphs.append(". ".join(translated_sentences))
 
 
 
 
391
 
392
  return "\n".join(translated_paragraphs)
393
 
 
411
  Returns:
412
  Transcribed text
413
  """
414
+ model = self.model_manager.get_whisper_model()
415
+ result = model.transcribe(audio_file_path)
416
+ return result["text"]
 
 
 
 
417
 
418
  # ================================
419
  # Main Application
 
434
  source_lang: Language,
435
  text_input: str,
436
  audio_file: Optional[str],
437
+ file_obj: Optional[gr.FileData]
438
  ) -> str:
439
  """
440
  Process input based on selected mode.
 
449
  Returns:
450
  Processed text content
451
  """
452
+ if mode == InputMode.TEXT:
453
+ return text_input
454
+
455
+ elif mode == InputMode.AUDIO:
456
+ if source_lang != Language.ENGLISH:
457
+ raise ValueError("Audio input must be in English.")
458
+ if not audio_file:
459
+ raise ValueError("No audio file provided.")
460
+ return self.audio_processor.transcribe(audio_file)
461
+
462
+ elif mode == InputMode.FILE:
463
+ if not file_obj:
464
+ raise ValueError("No file uploaded.")
465
+ return self.content_processor.extract_text_from_file(file_obj.name)
466
+
467
+ return ""
 
 
 
 
 
 
 
468
 
469
  def create_interface(self) -> gr.Blocks:
470
  """Create and return the Gradio interface."""
 
473
  title="LocaleNLP Translation Service",
474
  theme=gr.themes.Monochrome()
475
  ) as interface:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
476
  # Header
477
  gr.Markdown("""
478
  # 🌍 LocaleNLP Translation Service
 
538
  )
539
 
540
  # Event handlers
541
+ def update_visibility(mode: str) -> Dict[str, Any]:
542
  """Update component visibility based on input mode."""
543
+ return {
544
+ input_text: gr.update(visible=(mode == InputMode.TEXT.value)),
545
+ audio_input: gr.update(visible=(mode == InputMode.AUDIO.value)),
546
+ file_input: gr.update(visible=(mode == InputMode.FILE.value)),
547
+ extracted_text: gr.update(value="", visible=True),
548
+ output_text: gr.update(value="")
549
+ }
 
 
 
 
550
 
551
  def handle_process(
552
  mode: str,
553
  source_lang: str,
554
  text_input: str,
555
  audio_file: Optional[str],
556
+ file_obj: Optional[gr.FileData]
557
  ) -> Tuple[str, str]:
558
  """Handle initial input processing."""
559
  try: