File size: 8,749 Bytes
fcaa164 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
import functools
import logging
import time
import traceback
from abc import ABC, abstractmethod
from typing import Any, Callable, Iterable, List
from docling_core.types.doc import DoclingDocument, NodeItem
from docling.backend.abstract_backend import AbstractDocumentBackend
from docling.backend.pdf_backend import PdfDocumentBackend
from docling.datamodel.base_models import (
ConversionStatus,
DoclingComponentType,
ErrorItem,
Page,
)
from docling.datamodel.document import ConversionResult, InputDocument
from docling.datamodel.pipeline_options import PipelineOptions
from docling.datamodel.settings import settings
from docling.models.base_model import GenericEnrichmentModel
from docling.utils.profiling import ProfilingScope, TimeRecorder
from docling.utils.utils import chunkify
_log = logging.getLogger(__name__)
class BasePipeline(ABC):
def __init__(self, pipeline_options: PipelineOptions):
self.pipeline_options = pipeline_options
self.keep_images = False
self.build_pipe: List[Callable] = []
self.enrichment_pipe: List[GenericEnrichmentModel[Any]] = []
def execute(self, in_doc: InputDocument, raises_on_error: bool) -> ConversionResult:
conv_res = ConversionResult(input=in_doc)
_log.info(f"Processing document {in_doc.file.name}")
try:
with TimeRecorder(
conv_res, "pipeline_total", scope=ProfilingScope.DOCUMENT
):
# These steps are building and assembling the structure of the
# output DoclingDocument.
conv_res = self._build_document(conv_res)
conv_res = self._assemble_document(conv_res)
# From this stage, all operations should rely only on conv_res.output
conv_res = self._enrich_document(conv_res)
conv_res.status = self._determine_status(conv_res)
except Exception as e:
conv_res.status = ConversionStatus.FAILURE
if raises_on_error:
raise e
finally:
self._unload(conv_res)
return conv_res
@abstractmethod
def _build_document(self, conv_res: ConversionResult) -> ConversionResult:
pass
def _assemble_document(self, conv_res: ConversionResult) -> ConversionResult:
return conv_res
def _enrich_document(self, conv_res: ConversionResult) -> ConversionResult:
def _prepare_elements(
conv_res: ConversionResult, model: GenericEnrichmentModel[Any]
) -> Iterable[NodeItem]:
for doc_element, _level in conv_res.document.iterate_items():
prepared_element = model.prepare_element(
conv_res=conv_res, element=doc_element
)
if prepared_element is not None:
yield prepared_element
with TimeRecorder(conv_res, "doc_enrich", scope=ProfilingScope.DOCUMENT):
for model in self.enrichment_pipe:
for element_batch in chunkify(
_prepare_elements(conv_res, model),
model.elements_batch_size,
):
for element in model(
doc=conv_res.document, element_batch=element_batch
): # Must exhaust!
pass
return conv_res
@abstractmethod
def _determine_status(self, conv_res: ConversionResult) -> ConversionStatus:
pass
def _unload(self, conv_res: ConversionResult):
pass
@classmethod
@abstractmethod
def get_default_options(cls) -> PipelineOptions:
pass
@classmethod
@abstractmethod
def is_backend_supported(cls, backend: AbstractDocumentBackend):
pass
# def _apply_on_elements(self, element_batch: Iterable[NodeItem]) -> Iterable[Any]:
# for model in self.build_pipe:
# element_batch = model(element_batch)
#
# yield from element_batch
class PaginatedPipeline(BasePipeline): # TODO this is a bad name.
def __init__(self, pipeline_options: PipelineOptions):
super().__init__(pipeline_options)
self.keep_backend = False
def _apply_on_pages(
self, conv_res: ConversionResult, page_batch: Iterable[Page]
) -> Iterable[Page]:
for model in self.build_pipe:
page_batch = model(conv_res, page_batch)
yield from page_batch
def _build_document(self, conv_res: ConversionResult) -> ConversionResult:
if not isinstance(conv_res.input._backend, PdfDocumentBackend):
raise RuntimeError(
f"The selected backend {type(conv_res.input._backend).__name__} for {conv_res.input.file} is not a PDF backend. "
f"Can not convert this with a PDF pipeline. "
f"Please check your format configuration on DocumentConverter."
)
# conv_res.status = ConversionStatus.FAILURE
# return conv_res
total_elapsed_time = 0.0
with TimeRecorder(conv_res, "doc_build", scope=ProfilingScope.DOCUMENT):
for i in range(0, conv_res.input.page_count):
start_page, end_page = conv_res.input.limits.page_range
if (start_page - 1) <= i <= (end_page - 1):
conv_res.pages.append(Page(page_no=i))
try:
# Iterate batches of pages (page_batch_size) in the doc
for page_batch in chunkify(
conv_res.pages, settings.perf.page_batch_size
):
start_batch_time = time.monotonic()
# 1. Initialise the page resources
init_pages = map(
functools.partial(self.initialize_page, conv_res), page_batch
)
# 2. Run pipeline stages
pipeline_pages = self._apply_on_pages(conv_res, init_pages)
for p in pipeline_pages: # Must exhaust!
# Cleanup cached images
if not self.keep_images:
p._image_cache = {}
# Cleanup page backends
if not self.keep_backend and p._backend is not None:
p._backend.unload()
end_batch_time = time.monotonic()
total_elapsed_time += end_batch_time - start_batch_time
if (
self.pipeline_options.document_timeout is not None
and total_elapsed_time > self.pipeline_options.document_timeout
):
_log.warning(
f"Document processing time ({total_elapsed_time:.3f} seconds) exceeded the specified timeout of {self.pipeline_options.document_timeout:.3f} seconds"
)
conv_res.status = ConversionStatus.PARTIAL_SUCCESS
break
_log.debug(
f"Finished converting page batch time={end_batch_time:.3f}"
)
except Exception as e:
conv_res.status = ConversionStatus.FAILURE
trace = "\n".join(
traceback.format_exception(type(e), e, e.__traceback__)
)
_log.warning(
f"Encountered an error during conversion of document {conv_res.input.document_hash}:\n"
f"{trace}"
)
raise e
return conv_res
def _unload(self, conv_res: ConversionResult) -> ConversionResult:
for page in conv_res.pages:
if page._backend is not None:
page._backend.unload()
if conv_res.input._backend:
conv_res.input._backend.unload()
return conv_res
def _determine_status(self, conv_res: ConversionResult) -> ConversionStatus:
status = ConversionStatus.SUCCESS
for page in conv_res.pages:
if page._backend is None or not page._backend.is_valid():
conv_res.errors.append(
ErrorItem(
component_type=DoclingComponentType.DOCUMENT_BACKEND,
module_name=type(page._backend).__name__,
error_message=f"Page {page.page_no} failed to parse.",
)
)
status = ConversionStatus.PARTIAL_SUCCESS
return status
# Initialise and load resources for a page
@abstractmethod
def initialize_page(self, conv_res: ConversionResult, page: Page) -> Page:
pass
|