| """DOCX file processor.""" |
|
|
| import os |
| from typing import Dict, Any |
|
|
| from .base import BaseProcessor |
| from ..result import ConversionResult |
| from ..exceptions import ConversionError, FileNotFoundError |
|
|
|
|
| class DOCXProcessor(BaseProcessor): |
| """Processor for Microsoft Word DOCX and DOC files.""" |
| |
| def can_process(self, file_path: str) -> bool: |
| """Check if this processor can handle the given file. |
| |
| Args: |
| file_path: Path to the file to check |
| |
| Returns: |
| True if this processor can handle the file |
| """ |
| if not os.path.exists(file_path): |
| return False |
| |
| |
| file_path_str = str(file_path) |
| _, ext = os.path.splitext(file_path_str.lower()) |
| return ext in ['.docx', '.doc'] |
| |
| def process(self, file_path: str) -> ConversionResult: |
| """Process the DOCX file and return a conversion result. |
| |
| Args: |
| file_path: Path to the DOCX file to process |
| |
| Returns: |
| ConversionResult containing the processed content |
| |
| Raises: |
| FileNotFoundError: If the file doesn't exist |
| ConversionError: If processing fails |
| """ |
| if not os.path.exists(file_path): |
| raise FileNotFoundError(f"File not found: {file_path}") |
| |
| |
| metadata = { |
| "file_path": file_path, |
| "file_size": os.path.getsize(file_path), |
| "processor": "DOCXProcessor" |
| } |
| |
| |
| file_path_str = str(file_path) |
| _, ext = os.path.splitext(file_path_str.lower()) |
| |
| if ext == '.doc': |
| return self._process_doc_file(file_path, metadata) |
| else: |
| return self._process_docx_file(file_path, metadata) |
| |
| def _process_doc_file(self, file_path: str, metadata: Dict[str, Any]) -> ConversionResult: |
| """Process .doc files using pypandoc.""" |
| try: |
| import pypandoc |
| |
| |
| content = pypandoc.convert_file(file_path, 'markdown') |
| |
| metadata.update({ |
| "file_type": "doc", |
| "extractor": "pypandoc" |
| }) |
| |
| |
| content = self._clean_content(content) |
| |
| return ConversionResult(content, metadata) |
| |
| except ImportError: |
| raise ConversionError("pypandoc is required for .doc file processing. Install it with: pip install pypandoc") |
| except Exception as e: |
| raise ConversionError(f"Failed to process .doc file {file_path}: {str(e)}") |
| |
| def _process_docx_file(self, file_path: str, metadata: Dict[str, Any]) -> ConversionResult: |
| """Process .docx files using python-docx with improved table extraction.""" |
| try: |
| from docx import Document |
|
|
| content_parts = [] |
| doc = Document(file_path) |
|
|
| metadata.update({ |
| "paragraph_count": len(doc.paragraphs), |
| "section_count": len(doc.sections), |
| "file_type": "docx", |
| "extractor": "python-docx" |
| }) |
|
|
| |
| for paragraph in doc.paragraphs: |
| if paragraph.text.strip(): |
| |
| if paragraph.style.name.startswith('Heading'): |
| level = paragraph.style.name.replace('Heading ', '') |
| try: |
| level_num = int(level) |
| content_parts.append(f"\n{'#' * min(level_num, 6)} {paragraph.text}\n") |
| except ValueError: |
| content_parts.append(f"\n## {paragraph.text}\n") |
| else: |
| content_parts.append(paragraph.text) |
|
|
| |
| for table_idx, table in enumerate(doc.tables): |
| |
| preserve_layout = getattr(self, 'preserve_layout', False) |
| if preserve_layout: |
| content_parts.append(f"\n### Table {table_idx+1}\n") |
|
|
| |
| rows = table.rows |
| if not rows: |
| continue |
|
|
| |
| merged_warning = False |
| for row in rows: |
| for cell in row.cells: |
| if len(cell._tc.xpath('.//w:vMerge')) > 0 or len(cell._tc.xpath('.//w:gridSpan')) > 0: |
| merged_warning = True |
| break |
| if merged_warning: |
| break |
| if merged_warning: |
| content_parts.append("*Warning: Table contains merged cells which may not render correctly in markdown.*\n") |
|
|
| |
| row_limit = 20 |
| if len(rows) > row_limit: |
| content_parts.append(f"*Table truncated to first {row_limit} rows out of {len(rows)} total.*\n") |
|
|
| |
| table_data = [] |
| for i, row in enumerate(rows): |
| if i >= row_limit: |
| break |
| row_data = [cell.text.strip().replace('\n', ' ') for cell in row.cells] |
| table_data.append(row_data) |
|
|
| |
| max_cols = max(len(r) for r in table_data) |
| for r in table_data: |
| while len(r) < max_cols: |
| r.append("") |
|
|
| |
| if table_data: |
| header = table_data[0] |
| separator = ["---"] * len(header) |
| content_parts.append("| " + " | ".join(header) + " |") |
| content_parts.append("| " + " | ".join(separator) + " |") |
| for row in table_data[1:]: |
| content_parts.append("| " + " | ".join(row) + " |") |
| content_parts.append("") |
|
|
| content = '\n'.join(content_parts) |
| content = self._clean_content(content) |
| return ConversionResult(content, metadata) |
| except ImportError: |
| raise ConversionError("python-docx is required for .docx file processing. Install it with: pip install python-docx") |
| except Exception as e: |
| raise ConversionError(f"Failed to process .docx file {file_path}: {str(e)}") |
| |
| def _clean_content(self, content: str) -> str: |
| """Clean up the extracted Word content. |
| |
| Args: |
| content: Raw Word text content |
| |
| Returns: |
| Cleaned text content |
| """ |
| |
| lines = content.split('\n') |
| cleaned_lines = [] |
| |
| for line in lines: |
| |
| line = ' '.join(line.split()) |
| if line.strip(): |
| cleaned_lines.append(line) |
| |
| |
| content = '\n'.join(cleaned_lines) |
| |
| |
| content = content.replace('## ', '\n## ') |
| content = content.replace('### ', '\n### ') |
| |
| return content.strip() |