| import os |
| import re |
| from typing import Tuple, Optional |
| from pathlib import Path |
| from pypdf import PdfReader |
| from docx import Document |
|
|
| _EMOJI_RE = re.compile( |
| "[\U0001F600-\U0001F64F" |
| "\U0001F300-\U0001F5FF" |
| "\U0001F680-\U0001F6FF" |
| "\U0001F1E0-\U0001F1FF" |
| "\U0001F900-\U0001F9FF" |
| "\U0001FA00-\U0001FA6F" |
| "\U0001FA70-\U0001FAFF" |
| "]+", |
| flags=re.UNICODE, |
| ) |
|
|
| class FileParser: |
| """ |
| Parse multiple file formats and extract text. |
| Supports: PDF, DOCX, TXT, and raw text input. |
| """ |
| |
| SUPPORTED_FORMATS = {".pdf", ".docx", ".doc", ".txt"} |
| |
| @staticmethod |
| def parse_file(file_path: str) -> Tuple[str, str, Optional[Exception]]: |
| """ |
| Parse a file and extract text. |
| |
| Args: |
| file_path: Path to the file |
| |
| Returns: |
| Tuple of (text, format, error) |
| - text: Extracted text content |
| - format: File format (pdf, docx, txt) |
| - error: Exception if parsing failed, None if successful |
| """ |
| file_extension = Path(file_path).suffix.lower() |
| |
| if file_extension not in FileParser.SUPPORTED_FORMATS: |
| error = ValueError(f"Unsupported file format: {file_extension}") |
| return "", "", error |
| |
| if file_extension == ".pdf": |
| return FileParser.parse_pdf(file_path) |
| elif file_extension in {".docx", ".doc"}: |
| return FileParser.parse_docx(file_path) |
| elif file_extension == ".txt": |
| return FileParser.parse_txt(file_path) |
| |
| return "", "", ValueError("Unknown error") |
| |
| @staticmethod |
| def parse_pdf(file_path: str) -> Tuple[str, str, Optional[Exception]]: |
| """Extract text from PDF file""" |
| try: |
| text = "" |
| with open(file_path, 'rb') as pdf_file: |
| pdf_reader = PdfReader(pdf_file) |
| |
| |
| for page_num in range(len(pdf_reader.pages)): |
| page = pdf_reader.pages[page_num] |
| text += page.extract_text() + "\n" |
| |
| return text.strip(), "pdf", None |
| |
| except Exception as e: |
| return "", "pdf", e |
| |
| @staticmethod |
| def parse_docx(file_path: str) -> Tuple[str, str, Optional[Exception]]: |
| """Extract text from DOCX file""" |
| try: |
| doc = Document(file_path) |
| text = "" |
| |
| |
| for paragraph in doc.paragraphs: |
| text += paragraph.text + "\n" |
| |
| |
| for table in doc.tables: |
| for row in table.rows: |
| for cell in row.cells: |
| text += cell.text + "\n" |
| |
| return text.strip(), "docx", None |
| |
| except Exception as e: |
| return "", "docx", e |
| |
| @staticmethod |
| def parse_txt(file_path: str) -> Tuple[str, str, Optional[Exception]]: |
| """Extract text from plain text file""" |
| try: |
| with open(file_path, 'r', encoding='utf-8') as txt_file: |
| text = txt_file.read() |
| |
| return text.strip(), "txt", None |
| |
| except UnicodeDecodeError: |
| |
| try: |
| with open(file_path, 'r', encoding='latin-1') as txt_file: |
| text = txt_file.read() |
| return text.strip(), "txt", None |
| except Exception as e: |
| return "", "txt", e |
| |
| except Exception as e: |
| return "", "txt", e |
| |
| @staticmethod |
| def parse_raw_text(text: str) -> Tuple[str, str, Optional[Exception]]: |
| """Process raw text input""" |
| try: |
| cleaned_text = text.strip() |
| if not cleaned_text: |
| return "", "raw", ValueError("Empty text provided") |
| return cleaned_text, "raw", None |
| except Exception as e: |
| return "", "raw", e |
|
|
| class TextCleaner: |
| """Clean and normalize extracted text""" |
| |
| @staticmethod |
| def clean(text: str) -> str: |
| """ |
| Clean and normalize text. |
| Removes extra whitespace, normalizes line breaks, etc. |
| """ |
| |
| text = ' '.join(text.split()) |
| |
| |
| text = text.replace('\r\n', '\n').replace('\r', '\n') |
| |
| return text |
| |
| @staticmethod |
| def get_text_stats(text: str) -> dict: |
| """Get statistics about text""" |
| words = text.split() |
| sentences = text.split('.') |
|
|
| return { |
| "character_count": len(text), |
| "word_count": len(words), |
| "sentence_count": len([s for s in sentences if s.strip()]), |
| "average_word_length": len(text) / len(words) if words else 0, |
| "average_sentence_length": len(words) / len(sentences) if sentences else 0, |
| "emoji_count": sum(len(m) for m in _EMOJI_RE.findall(text)), |
| "em_dash_count": text.count('\u2014'), |
| "arrow_count": text.count('\u2192'), |
| } |
|
|