|
|
import logging |
|
|
import warnings |
|
|
import zipfile |
|
|
from pathlib import Path |
|
|
from typing import Iterable, List, Optional |
|
|
|
|
|
import numpy |
|
|
from docling_core.types.doc import BoundingBox, CoordOrigin |
|
|
|
|
|
from docling.datamodel.base_models import Cell, OcrCell, Page |
|
|
from docling.datamodel.document import ConversionResult |
|
|
from docling.datamodel.pipeline_options import ( |
|
|
AcceleratorDevice, |
|
|
AcceleratorOptions, |
|
|
EasyOcrOptions, |
|
|
) |
|
|
from docling.datamodel.settings import settings |
|
|
from docling.models.base_ocr_model import BaseOcrModel |
|
|
from docling.utils.accelerator_utils import decide_device |
|
|
from docling.utils.profiling import TimeRecorder |
|
|
from docling.utils.utils import download_url_with_progress |
|
|
|
|
|
_log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class EasyOcrModel(BaseOcrModel): |
|
|
_model_repo_folder = "EasyOcr" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
enabled: bool, |
|
|
artifacts_path: Optional[Path], |
|
|
options: EasyOcrOptions, |
|
|
accelerator_options: AcceleratorOptions, |
|
|
): |
|
|
super().__init__(enabled=enabled, options=options) |
|
|
self.options: EasyOcrOptions |
|
|
|
|
|
self.scale = 3 |
|
|
|
|
|
if self.enabled: |
|
|
try: |
|
|
import easyocr |
|
|
except ImportError: |
|
|
raise ImportError( |
|
|
"EasyOCR is not installed. Please install it via `pip install easyocr` to use this OCR engine. " |
|
|
"Alternatively, Docling has support for other OCR engines. See the documentation." |
|
|
) |
|
|
|
|
|
if self.options.use_gpu is None: |
|
|
device = decide_device(accelerator_options.device) |
|
|
|
|
|
use_gpu = any( |
|
|
[ |
|
|
device.startswith(x) |
|
|
for x in [ |
|
|
AcceleratorDevice.CUDA.value, |
|
|
AcceleratorDevice.MPS.value, |
|
|
] |
|
|
] |
|
|
) |
|
|
else: |
|
|
warnings.warn( |
|
|
"Deprecated field. Better to set the `accelerator_options.device` in `pipeline_options`. " |
|
|
"When `use_gpu and accelerator_options.device == AcceleratorDevice.CUDA` the GPU is used " |
|
|
"to run EasyOCR. Otherwise, EasyOCR runs in CPU." |
|
|
) |
|
|
use_gpu = self.options.use_gpu |
|
|
|
|
|
download_enabled = self.options.download_enabled |
|
|
model_storage_directory = self.options.model_storage_directory |
|
|
if artifacts_path is not None and model_storage_directory is None: |
|
|
download_enabled = False |
|
|
model_storage_directory = str(artifacts_path / self._model_repo_folder) |
|
|
|
|
|
self.reader = easyocr.Reader( |
|
|
lang_list=self.options.lang, |
|
|
gpu=use_gpu, |
|
|
model_storage_directory=model_storage_directory, |
|
|
recog_network=self.options.recog_network, |
|
|
download_enabled=download_enabled, |
|
|
verbose=False, |
|
|
) |
|
|
|
|
|
@staticmethod |
|
|
def download_models( |
|
|
detection_models: List[str] = ["craft"], |
|
|
recognition_models: List[str] = ["english_g2", "latin_g2"], |
|
|
local_dir: Optional[Path] = None, |
|
|
force: bool = False, |
|
|
progress: bool = False, |
|
|
) -> Path: |
|
|
|
|
|
from easyocr.config import detection_models as det_models_dict |
|
|
from easyocr.config import recognition_models as rec_models_dict |
|
|
|
|
|
if local_dir is None: |
|
|
local_dir = settings.cache_dir / "models" / EasyOcrModel._model_repo_folder |
|
|
|
|
|
local_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
download_list = [] |
|
|
for model_name in detection_models: |
|
|
if model_name in det_models_dict: |
|
|
download_list.append(det_models_dict[model_name]) |
|
|
for model_name in recognition_models: |
|
|
if model_name in rec_models_dict["gen2"]: |
|
|
download_list.append(rec_models_dict["gen2"][model_name]) |
|
|
|
|
|
|
|
|
for model_details in download_list: |
|
|
buf = download_url_with_progress(model_details["url"], progress=progress) |
|
|
with zipfile.ZipFile(buf, "r") as zip_ref: |
|
|
zip_ref.extractall(local_dir) |
|
|
|
|
|
return local_dir |
|
|
|
|
|
def __call__( |
|
|
self, conv_res: ConversionResult, page_batch: Iterable[Page] |
|
|
) -> Iterable[Page]: |
|
|
|
|
|
if not self.enabled: |
|
|
yield from page_batch |
|
|
return |
|
|
|
|
|
for page in page_batch: |
|
|
|
|
|
assert page._backend is not None |
|
|
if not page._backend.is_valid(): |
|
|
yield page |
|
|
else: |
|
|
with TimeRecorder(conv_res, "ocr"): |
|
|
ocr_rects = self.get_ocr_rects(page) |
|
|
|
|
|
all_ocr_cells = [] |
|
|
for ocr_rect in ocr_rects: |
|
|
|
|
|
if ocr_rect.area() == 0: |
|
|
continue |
|
|
high_res_image = page._backend.get_page_image( |
|
|
scale=self.scale, cropbox=ocr_rect |
|
|
) |
|
|
im = numpy.array(high_res_image) |
|
|
result = self.reader.readtext(im) |
|
|
|
|
|
del high_res_image |
|
|
del im |
|
|
|
|
|
cells = [ |
|
|
OcrCell( |
|
|
id=ix, |
|
|
text=line[1], |
|
|
confidence=line[2], |
|
|
bbox=BoundingBox.from_tuple( |
|
|
coord=( |
|
|
(line[0][0][0] / self.scale) + ocr_rect.l, |
|
|
(line[0][0][1] / self.scale) + ocr_rect.t, |
|
|
(line[0][2][0] / self.scale) + ocr_rect.l, |
|
|
(line[0][2][1] / self.scale) + ocr_rect.t, |
|
|
), |
|
|
origin=CoordOrigin.TOPLEFT, |
|
|
), |
|
|
) |
|
|
for ix, line in enumerate(result) |
|
|
if line[2] >= self.options.confidence_threshold |
|
|
] |
|
|
all_ocr_cells.extend(cells) |
|
|
|
|
|
|
|
|
page.cells = self.post_process_cells(all_ocr_cells, page.cells) |
|
|
|
|
|
|
|
|
if settings.debug.visualize_ocr: |
|
|
self.draw_ocr_rects_and_cells(conv_res, page, ocr_rects) |
|
|
|
|
|
yield page |
|
|
|