File size: 20,816 Bytes
d66e90c
 
 
 
 
 
 
c0c51c2
 
 
 
 
 
5e0609f
 
 
 
 
c0c51c2
 
 
 
d437733
c0c51c2
 
 
 
 
 
 
d66e90c
c0c51c2
 
 
 
 
d437733
 
 
 
 
 
 
c0c51c2
 
 
 
 
 
 
 
 
 
 
 
 
 
d66e90c
c0c51c2
d66e90c
 
 
c0c51c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d66e90c
c0c51c2
 
 
 
 
 
 
d66e90c
 
 
 
5e0609f
 
 
 
 
 
 
 
 
 
 
 
d66e90c
 
5e0609f
 
 
 
 
 
d66e90c
c0c51c2
d66e90c
 
 
c0c51c2
d66e90c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e0609f
d66e90c
 
 
 
 
 
 
 
 
 
 
 
 
18e6067
 
 
d66e90c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18e6067
 
d66e90c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c0c51c2
 
 
 
d66e90c
 
 
 
c0c51c2
d66e90c
 
 
 
 
 
c0c51c2
 
 
033e4ba
c0c51c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18e6067
c0c51c2
 
18e6067
c0c51c2
 
 
 
 
 
 
 
 
 
 
 
 
 
d437733
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c61b4e2
 
 
 
 
d437733
 
 
 
 
 
 
 
 
d66e90c
 
d437733
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c61b4e2
d437733
 
 
 
 
 
 
c61b4e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d437733
c61b4e2
d437733
c61b4e2
d437733
 
 
 
c0c51c2
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# Import spaces module for ZeroGPU support - Must be first import
try:
    import spaces
    HAS_SPACES = True
except ImportError:
    HAS_SPACES = False

import logging
import os
from pathlib import Path
from typing import Dict, List, Optional, Any, Union, Set
import tempfile

# Force CPU-only mode for EasyOCR and other CUDA libraries
os.environ['CUDA_VISIBLE_DEVICES'] = ''
os.environ['USE_TORCH'] = '1'
os.environ['EASYOCR_GPU'] = 'False'

# Import the parser interface and registry
from src.parsers.parser_interface import DocumentParser
from src.parsers.parser_registry import ParserRegistry
from src.core.exceptions import DocumentProcessingError, ParserError
from src.core.config import config

# Check for Docling availability
try:
    from docling.document_converter import DocumentConverter
    from docling.datamodel.base_models import InputFormat
    from docling.datamodel.pipeline_options import PdfPipelineOptions, EasyOcrOptions, TesseractOcrOptions
    from docling.document_converter import PdfFormatOption
    from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
    HAS_DOCLING = True
except ImportError:
    HAS_DOCLING = False
    logging.warning("Docling package not installed. Please install with 'pip install docling'")

# Gemini availability
try:
    from google import genai
    HAS_GEMINI = True
except ImportError:
    HAS_GEMINI = False

# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class DoclingParser(DocumentParser):
    """
    Parser implementation using Docling for converting documents to Markdown.
    Supports advanced PDF understanding, OCR, and multiple document formats.
    """
    
    def __init__(self):
        super().__init__()  # Initialize the base class (including _cancellation_flag)
        self.converter = None
        self.gpu_converter = None
        
        # Don't initialize converters here to avoid CUDA issues
        # They will be created on-demand in the parse methods
        logger.info("Docling parser initialized (converters will be created on-demand)")
    
    def _create_converter_with_options(self, ocr_method: str, **kwargs) -> DocumentConverter:
        """Create a DocumentConverter with specific OCR options."""
        pipeline_options = PdfPipelineOptions()
        
        # Enable OCR by default
        pipeline_options.do_ocr = True
        
        # Configure OCR method
        if ocr_method == "docling_tesseract":
            pipeline_options.ocr_options = TesseractOcrOptions()
        elif ocr_method == "docling_easyocr":
            pipeline_options.ocr_options = EasyOcrOptions()
        else:  # Default to EasyOCR
            pipeline_options.ocr_options = EasyOcrOptions()
        
        # Configure advanced features
        pipeline_options.do_table_structure = kwargs.get('enable_tables', True)
        pipeline_options.do_code_enrichment = kwargs.get('enable_code_enrichment', False)
        pipeline_options.do_formula_enrichment = kwargs.get('enable_formula_enrichment', False)
        pipeline_options.do_picture_classification = kwargs.get('enable_picture_classification', False)
        pipeline_options.generate_picture_images = kwargs.get('generate_picture_images', False)
        
        # Create converter with options
        converter = DocumentConverter(
            format_options={
                InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
            }
        )
        
        return converter
    
    def parse(self, file_path: Union[str, Path], ocr_method: Optional[str] = None, **kwargs) -> str:
        """
        Parse a document and return its content as Markdown.
        
        Args:
            file_path: Path to the document
            ocr_method: OCR method to use ('docling_default', 'docling_tesseract', 'docling_easyocr')
            **kwargs: Additional options for Docling processing
        
        Returns:
            str: Markdown representation of the document
        """
        # Validate file first
        self.validate_file(file_path)
        
        # Check if Docling is available
        if not HAS_DOCLING:
            raise ParserError("Docling is not available. Please install with 'pip install docling'")
        
        # Check for cancellation before starting
        if self._check_cancellation():
            raise DocumentProcessingError("Conversion cancelled")
        
        try:
            # Try ZeroGPU first, fallback to CPU
            if HAS_SPACES:
                try:
                    logger.info("Attempting Docling processing with ZeroGPU")
                    # Filter kwargs to avoid pickle issues
                    safe_kwargs = {}
                    for key, value in kwargs.items():
                        if not key.startswith('_') and not hasattr(value, '__call__'):
                            try:
                                import pickle
                                pickle.dumps(value)
                                safe_kwargs[key] = value
                            except (TypeError, pickle.PickleError):
                                logger.debug(f"Skipping unpicklable kwarg: {key}")
                    
                    result = self._process_with_gpu(str(file_path), ocr_method, **safe_kwargs)
                    return result
                except Exception as e:
                    if "pickle" in str(e).lower():
                        logger.warning(f"ZeroGPU pickle error: {str(e)}")
                    elif "cuda" in str(e).lower():
                        logger.warning(f"ZeroGPU CUDA error: {str(e)}")
                    else:
                        logger.warning(f"ZeroGPU processing failed: {str(e)}")
                    logger.info("Falling back to CPU processing")
            
            # Fallback to CPU processing
            result = self._process_with_cpu(str(file_path), ocr_method, **kwargs)
            return result
            
        except Exception as e:
            logger.error(f"Error converting file with Docling: {str(e)}")
            raise DocumentProcessingError(f"Docling conversion failed: {str(e)}")
    
    def _process_with_cpu(self, file_path: str, ocr_method: Optional[str] = None, **kwargs) -> str:
        """Process document with CPU-only Docling converter."""
        logger.info("Processing with CPU-only Docling converter")
        
        # Create CPU converter if not exists
        if self.converter is None:
            self.converter = self._create_cpu_converter(ocr_method, **kwargs)
        
        # Convert the document
        result = self.converter.convert(file_path)
        
        # Check for cancellation after processing
        if self._check_cancellation():
            raise DocumentProcessingError("Conversion cancelled")
        
        # Export to markdown
        return result.document.export_to_markdown()
    
    def _create_cpu_converter(self, ocr_method: Optional[str] = None, **kwargs) -> DocumentConverter:
        """Create a CPU-only DocumentConverter with proper OCR fallback."""
        # Configure CPU-only accelerator
        accelerator_options = AcceleratorOptions(
            num_threads=4, 
            device=AcceleratorDevice.CPU
        )
        
        # Create pipeline options with CPU-only accelerator
        pipeline_options = PdfPipelineOptions()
        pipeline_options.accelerator_options = accelerator_options
        pipeline_options.do_ocr = True
        pipeline_options.do_table_structure = True
        pipeline_options.table_structure_options.do_cell_matching = True
        
        # Configure OCR method - use EasyOCR with CPU enforcement
        pipeline_options.ocr_options = EasyOcrOptions()
        logger.info("Using EasyOCR (CPU-only)")
        
        # Configure advanced features
        pipeline_options.do_table_structure = kwargs.get('enable_tables', True)
        pipeline_options.do_code_enrichment = kwargs.get('enable_code_enrichment', False)
        pipeline_options.do_formula_enrichment = kwargs.get('enable_formula_enrichment', False)
        pipeline_options.do_picture_classification = kwargs.get('enable_picture_classification', False)
        pipeline_options.generate_picture_images = kwargs.get('generate_picture_images', False)
        
        # Create converter with CPU-only configuration
        return DocumentConverter(
            format_options={
                InputFormat.PDF: PdfFormatOption(
                    pipeline_options=pipeline_options,
                )
            }
        )
    
    # Define the GPU-decorated function for ZeroGPU
    if HAS_SPACES:
        @spaces.GPU(duration=120)  # Allocate GPU for up to 2 minutes
        def _process_with_gpu(self, file_path: str, ocr_method: Optional[str] = None, **kwargs) -> str:
            """Process document with GPU-accelerated Docling converter.
            
            IMPORTANT: All model loading and CUDA operations must happen inside this method.
            """
            logger.info("Processing with ZeroGPU allocation")
            
            # Configure GPU accelerator
            accelerator_options = AcceleratorOptions(
                num_threads=4, 
                device=AcceleratorDevice.CUDA
            )
            
            # Create pipeline options with GPU accelerator
            pipeline_options = PdfPipelineOptions()
            pipeline_options.accelerator_options = accelerator_options
            pipeline_options.do_ocr = True
            pipeline_options.do_table_structure = True
            pipeline_options.table_structure_options.do_cell_matching = True
            
            # Configure OCR method - use EasyOCR
            pipeline_options.ocr_options = EasyOcrOptions()
            
            # Configure advanced features
            pipeline_options.do_table_structure = kwargs.get('enable_tables', True)
            pipeline_options.do_code_enrichment = kwargs.get('enable_code_enrichment', False)
            pipeline_options.do_formula_enrichment = kwargs.get('enable_formula_enrichment', False)
            pipeline_options.do_picture_classification = kwargs.get('enable_picture_classification', False)
            pipeline_options.generate_picture_images = kwargs.get('generate_picture_images', False)
            
            # Create converter with GPU configuration inside the decorated function
            converter = DocumentConverter(
                format_options={
                    InputFormat.PDF: PdfFormatOption(
                        pipeline_options=pipeline_options,
                    )
                }
            )
            
            # Convert the document
            result = converter.convert(file_path)
            
            # Export to markdown
            markdown_content = result.document.export_to_markdown()
            
            # Clean up to free memory
            del converter
            import gc
            gc.collect()
            
            return markdown_content
    else:
        # Define a dummy method if spaces is not available
        def _process_with_gpu(self, file_path: str, ocr_method: Optional[str] = None, **kwargs) -> str:
            # This should never be called if HAS_SPACES is False
            return self._process_with_cpu(file_path, ocr_method, **kwargs)
    
    @classmethod
    def get_name(cls) -> str:
        return "Docling"
    
    @classmethod
    def get_supported_file_types(cls) -> Set[str]:
        """Return a set of supported file extensions."""
        return {
            # PDF files
            ".pdf",
            # Image files
            ".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".webp",
            # Office documents
            ".docx", ".xlsx", ".pptx",
            # Web and markup
            ".html", ".xhtml", ".md",
            # Other formats
            ".csv"
        }
    
    @classmethod
    def is_available(cls) -> bool:
        """Check if this parser is available."""
        return HAS_DOCLING
    
    @classmethod
    def get_supported_ocr_methods(cls) -> List[Dict[str, Any]]:
        """Return list of supported OCR methods."""
        return [
            {
                "id": "docling_default",
                "name": "EasyOCR",
                "default_params": {
                    "enable_tables": True,
                    "enable_code_enrichment": False,
                    "enable_formula_enrichment": False,
                    "enable_picture_classification": False,
                    "generate_picture_images": False
                }
            }
        ]
    
    @classmethod
    def get_description(cls) -> str:
        return "Docling parser with advanced PDF understanding, table structure recognition, and multiple OCR engines"

    def _validate_batch_files(self, file_paths: List[Path]) -> None:
        """Validate batch of files (size, count, type) for multi-document processing."""
        if len(file_paths) == 0:
            raise DocumentProcessingError("No files provided for processing")
        if len(file_paths) > 5:
            raise DocumentProcessingError("Maximum 5 files allowed for batch processing")

        total_size = 0
        for fp in file_paths:
            if not fp.exists():
                raise DocumentProcessingError(f"File not found: {fp}")
            size = fp.stat().st_size
            if size > 10 * 1024 * 1024:  # 10 MB
                raise DocumentProcessingError(f"Individual file size exceeds 10MB: {fp.name}")
            total_size += size
        if total_size > 20 * 1024 * 1024:
            raise DocumentProcessingError(f"Combined file size ({total_size / (1024*1024):.1f}MB) exceeds 20MB limit")

    def _create_batch_prompt(self, file_paths: List[Path], processing_type: str, original_filenames: Optional[List[str]] = None) -> str:
        """Create a natural-language prompt for Gemini post-processing."""
        names = original_filenames if original_filenames else [p.name for p in file_paths]
        file_list = "\n".join(f"- {n}" for n in names)
        base = f"I will provide you with {len(file_paths)} documents:\n{file_list}\n\n"
        if processing_type == "combined":
            return base + "Merge the content into a single coherent markdown document, preserving structure."
        if processing_type == "individual":
            return base + "Convert each document to markdown under its own heading."
        if processing_type == "summary":
            return base + "Create an EXECUTIVE SUMMARY followed by detailed markdown conversions per document."
        if processing_type == "comparison":
            return base + "Provide a comparison table of the documents, individual summaries, and cross-document insights."
        # default fallback
        return base

    def _format_batch_output(self, response_text: str, file_paths: List[Path], processing_type: str, original_filenames: Optional[List[str]] = None) -> str:
        names = original_filenames if original_filenames else [p.name for p in file_paths]
        header = (
             f"<!-- Multi-Document Processing Results -->\n"
             f"<!-- Processing Type: {processing_type} -->\n"
             f"<!-- Files Processed: {len(file_paths)} -->\n"
             f"<!-- File Names: {', '.join(names)} -->\n\n"
         )
        # Ensure response_text is a string to avoid TypeError when it is None
        safe_resp = "" if response_text is None else str(response_text)
        return header + safe_resp

    def _convert_batch_with_docling(self, paths: List[Path], ocr_method: Optional[str], **kwargs) -> List[str]:
        """Run Docling conversion on a list of Paths and return markdown list."""
        if self._check_cancellation():
            raise DocumentProcessingError("Conversion cancelled")

        # Create CPU converter for batch processing (GPU not supported for batch yet)
        converter = self._create_cpu_converter(ocr_method, **kwargs)

        # Convert all docs
        from docling.datamodel.base_models import ConversionStatus
        markdown_results: List[str] = []
        conv_results = converter.convert_all([str(p) for p in paths], raises_on_error=False)

        for idx, conv_res in enumerate(conv_results):
            if self._check_cancellation():
                raise DocumentProcessingError("Conversion cancelled")

            if conv_res.status in (ConversionStatus.SUCCESS, ConversionStatus.PARTIAL_SUCCESS):
                markdown_results.append(conv_res.document.export_to_markdown())
            else:
                raise DocumentProcessingError(f"Docling failed to convert {paths[idx].name}")
        return markdown_results

    def parse_multiple(
        self,
        file_paths: List[Union[str, Path]],
        processing_type: str = "combined",
        original_filenames: Optional[List[str]] = None,
        ocr_method: Optional[str] = None,
        output_format: str = "markdown",
        **kwargs,
    ) -> str:
        """Multi-document processing using Docling + optional Gemini summarisation/comparison."""
        if not HAS_DOCLING:
            raise ParserError("Docling package not installed")

        paths = [Path(p) for p in file_paths]
        self._validate_batch_files(paths)

        # Run Docling conversion
        markdown_list = self._convert_batch_with_docling(paths, ocr_method, **kwargs)

        # LOCAL composition for combined/individual
        if processing_type in ("combined", "individual"):
            if processing_type == "individual":
                names = original_filenames if original_filenames else [p.name for p in paths]
                sections = [f"# Document {i+1}: {n}\n\n{md}" for i, (n, md) in enumerate(zip(names, markdown_list), 1)]
                combined = "\n\n---\n\n".join(sections)
            else:
                combined = "\n\n---\n\n".join(markdown_list)
            return self._format_batch_output(combined, paths, processing_type, original_filenames)

        # SUMMARY / COMPARISON → Gemini 2.5 Flash
        if not HAS_GEMINI or not config.api.google_api_key:
            raise DocumentProcessingError("Gemini API not available for summary/comparison post-processing")

        prompt = self._create_batch_prompt(paths, processing_type, original_filenames)
        combined_md = "\n\n---\n\n".join(markdown_list)

        try:
            client = genai.Client(api_key=config.api.google_api_key)
            response = client.models.generate_content(
                model=config.model.gemini_model,
                contents=[prompt + "\n\n" + combined_md],
                config={
                    "temperature": config.model.temperature,
                    "top_p": 0.95,
                    "top_k": 40,
                    "max_output_tokens": config.model.max_tokens,
                },
            )
            
            # Debug logging for response structure
            logger.debug(f"Gemini response type: {type(response)}")
            logger.debug(f"Gemini response attributes: {dir(response)}")
            
            # Try different ways to extract text from response
            final_text = None
            if hasattr(response, "text") and response.text:
                final_text = response.text
            elif hasattr(response, "candidates") and response.candidates:
                # Try to get text from first candidate
                candidate = response.candidates[0]
                if hasattr(candidate, "content") and candidate.content:
                    if hasattr(candidate.content, "parts") and candidate.content.parts:
                        final_text = candidate.content.parts[0].text
                    elif hasattr(candidate.content, "text"):
                        final_text = candidate.content.text
                elif hasattr(candidate, "text"):
                    final_text = candidate.text
            elif hasattr(response, "content") and response.content:
                final_text = str(response.content)
            
            if not final_text:
                logger.error(f"No text found in Gemini response. Response: {response}")
                raise DocumentProcessingError("Gemini post-processing returned no text")
                
        except Exception as e:
            logger.error(f"Gemini API error: {str(e)}")
            raise DocumentProcessingError(f"Gemini post-processing failed: {str(e)}")

        return self._format_batch_output(final_text, paths, processing_type, original_filenames)


# Register the parser with the registry if available
if HAS_DOCLING:
    ParserRegistry.register(DoclingParser)
    logger.info("Docling parser registered successfully")
else:
    logger.warning("Could not register Docling parser: Package not installed")