Spaces:
Running
Running
File size: 6,339 Bytes
15fdcff 5c197b6 15fdcff fdbfd73 15fdcff 8c92c5f 15fdcff 5c197b6 fdbfd73 15fdcff fdbfd73 15fdcff fdbfd73 15fdcff 8c92c5f fdbfd73 8c92c5f fdbfd73 15fdcff fdbfd73 15fdcff fdbfd73 15fdcff fdbfd73 15fdcff fdbfd73 15fdcff 070e4b3 fdbfd73 15fdcff fdbfd73 15fdcff fdbfd73 15fdcff |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import os
from pathlib import Path
from typing import Optional, Dict, Any, Union
import magic
from docling.document_converter import DocumentConverter
from datetime import datetime
import shutil
import tempfile
from .types import ParsedDocument, DocumentMetadata
from .exceptions import UnsupportedFormatError, ParseError
class DocumentParser:
"""
A multiformat document parser using Docling
"""
SUPPORTED_FORMATS = {
'application/pdf': 'pdf',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx',
'text/plain': 'txt',
'text/html': 'html',
'text/markdown': 'md',
# Add common variations
'application/x-pdf': 'pdf',
'application/acrobat': 'pdf',
'application/msword': 'docx',
'text/x-markdown': 'md',
'text/x-html': 'html'
}
EXTENSION_TO_MIME = {
'.pdf': 'application/pdf',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.txt': 'text/plain',
'.html': 'text/html',
'.htm': 'text/html',
'.md': 'text/markdown',
'.markdown': 'text/markdown'
}
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.converter = DocumentConverter()
# Create a temporary directory for processing files
self.temp_dir = Path(tempfile.mkdtemp(prefix="dockling_"))
def __del__(self):
"""Cleanup temporary directory on object destruction"""
if hasattr(self, 'temp_dir') and self.temp_dir.exists():
shutil.rmtree(self.temp_dir, ignore_errors=True)
def _validate_and_copy_file(self, file_path: Union[str, Path]) -> Path:
"""
Validate file and copy to temporary location with correct extension
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Try to determine format from extension first
extension = file_path.suffix.lower()
mime_type = self.EXTENSION_TO_MIME.get(extension)
# If extension not recognized, use magic
if not mime_type:
mime_type = magic.from_file(str(file_path), mime=True)
if mime_type in self.SUPPORTED_FORMATS:
extension = f".{self.SUPPORTED_FORMATS[mime_type]}"
else:
raise UnsupportedFormatError(
f"Unsupported file format: {mime_type}. "
f"Supported formats are: {', '.join(set(self.SUPPORTED_FORMATS.values()))}"
)
# Copy file to temp directory with correct extension
temp_file = self.temp_dir / f"doc{extension}"
shutil.copy2(file_path, temp_file)
return temp_file
def parse(self, file_path: Union[str, Path]) -> ParsedDocument:
"""
Parse a document file and return structured content
Args:
file_path: Path to the document file
Returns:
ParsedDocument object containing parsed content and metadata
Raises:
UnsupportedFormatError: If the file format is not supported
ParseError: If parsing fails
"""
try:
# Validate and prepare file
temp_file = self._validate_and_copy_file(file_path)
# Get file metadata
stats = temp_file.stat()
mime_type = magic.from_file(str(temp_file), mime=True)
metadata = DocumentMetadata(
filename=Path(file_path).name, # Use original filename
file_type=self.SUPPORTED_FORMATS[mime_type],
size_bytes=stats.st_size,
created_at=datetime.fromtimestamp(stats.st_ctime),
modified_at=datetime.fromtimestamp(stats.st_mtime),
mime_type=mime_type
)
try:
# Parse document using Docling
result = self.converter.convert(str(temp_file))
doc = result.document
# Extract content using proper methods
try:
content = doc.export_to_text()
except Exception as e:
raise ParseError(f"Failed to extract text content: {str(e)}")
# Extract structured content
structured_content = {
'sections': doc.sections if hasattr(doc, 'sections') else [],
'paragraphs': doc.paragraphs if hasattr(doc, 'paragraphs') else [],
'entities': doc.entities if hasattr(doc, 'entities') else {},
'metadata': doc.metadata if hasattr(doc, 'metadata') else {}
}
# Get raw text if available
try:
raw_text = doc.export_to_text(include_layout=True)
except:
raw_text = content
# Update metadata with document-specific information
if hasattr(doc, 'metadata') and doc.metadata:
metadata.title = doc.metadata.get('title')
metadata.author = doc.metadata.get('author')
metadata.pages = doc.metadata.get('pages')
metadata.extra.update(doc.metadata)
return ParsedDocument(
content=content,
metadata=metadata,
raw_text=raw_text,
structured_content=structured_content,
confidence_score=getattr(doc, 'confidence', 1.0)
)
except Exception as e:
raise ParseError(f"Failed to parse document: {str(e)}")
except Exception as e:
raise ParseError(str(e))
finally:
# Cleanup temporary files
if 'temp_file' in locals() and temp_file.exists():
try:
temp_file.unlink()
except:
pass
def supports_format(self, mime_type: str) -> bool:
"""Check if a given MIME type is supported"""
return mime_type in self.SUPPORTED_FORMATS |