Spaces:
Runtime error
Runtime error
| import csv | |
| import io | |
| import logging | |
| import os | |
| import tempfile | |
| from subprocess import DEVNULL, PIPE, Popen | |
| from typing import Iterable, List, Optional, Tuple | |
| import pandas as pd | |
| from docling_core.types.doc import BoundingBox, CoordOrigin | |
| from docling.datamodel.base_models import Cell, OcrCell, Page | |
| from docling.datamodel.document import ConversionResult | |
| from docling.datamodel.pipeline_options import TesseractCliOcrOptions | |
| from docling.datamodel.settings import settings | |
| from docling.models.base_ocr_model import BaseOcrModel | |
| from docling.utils.ocr_utils import map_tesseract_script | |
| from docling.utils.profiling import TimeRecorder | |
| _log = logging.getLogger(__name__) | |
| class TesseractOcrCliModel(BaseOcrModel): | |
| def __init__(self, enabled: bool, options: TesseractCliOcrOptions): | |
| super().__init__(enabled=enabled, options=options) | |
| self.options: TesseractCliOcrOptions | |
| self.scale = 3 # multiplier for 72 dpi == 216 dpi. | |
| self._name: Optional[str] = None | |
| self._version: Optional[str] = None | |
| self._tesseract_languages: Optional[List[str]] = None | |
| self._script_prefix: Optional[str] = None | |
| if self.enabled: | |
| try: | |
| self._get_name_and_version() | |
| self._set_languages_and_prefix() | |
| except Exception as exc: | |
| raise RuntimeError( | |
| f"Tesseract is not available, aborting: {exc} " | |
| "Install tesseract on your system and the tesseract binary is discoverable. " | |
| "The actual command for Tesseract can be specified in `pipeline_options.ocr_options.tesseract_cmd='tesseract'`. " | |
| "Alternatively, Docling has support for other OCR engines. See the documentation." | |
| ) | |
| def _get_name_and_version(self) -> Tuple[str, str]: | |
| if self._name != None and self._version != None: | |
| return self._name, self._version # type: ignore | |
| cmd = [self.options.tesseract_cmd, "--version"] | |
| proc = Popen(cmd, stdout=PIPE, stderr=PIPE) | |
| stdout, stderr = proc.communicate() | |
| proc.wait() | |
| # HACK: Windows versions of Tesseract output the version to stdout, Linux versions | |
| # to stderr, so check both. | |
| version_line = ( | |
| (stdout.decode("utf8").strip() or stderr.decode("utf8").strip()) | |
| .split("\n")[0] | |
| .strip() | |
| ) | |
| # If everything else fails... | |
| if not version_line: | |
| version_line = "tesseract XXX" | |
| name, version = version_line.split(" ") | |
| self._name = name | |
| self._version = version | |
| return name, version | |
| def _run_tesseract(self, ifilename: str): | |
| r""" | |
| Run tesseract CLI | |
| """ | |
| cmd = [self.options.tesseract_cmd] | |
| if "auto" in self.options.lang: | |
| lang = self._detect_language(ifilename) | |
| if lang is not None: | |
| cmd.append("-l") | |
| cmd.append(lang) | |
| elif self.options.lang is not None and len(self.options.lang) > 0: | |
| cmd.append("-l") | |
| cmd.append("+".join(self.options.lang)) | |
| if self.options.path is not None: | |
| cmd.append("--tessdata-dir") | |
| cmd.append(self.options.path) | |
| cmd += [ifilename, "stdout", "tsv"] | |
| _log.info("command: {}".format(" ".join(cmd))) | |
| proc = Popen(cmd, stdout=PIPE, stderr=DEVNULL) | |
| output, _ = proc.communicate() | |
| # _log.info(output) | |
| # Decode the byte string to a regular string | |
| decoded_data = output.decode("utf-8") | |
| # _log.info(decoded_data) | |
| # Read the TSV file generated by Tesseract | |
| df = pd.read_csv(io.StringIO(decoded_data), quoting=csv.QUOTE_NONE, sep="\t") | |
| # Display the dataframe (optional) | |
| # _log.info("df: ", df.head()) | |
| # Filter rows that contain actual text (ignore header or empty rows) | |
| df_filtered = df[df["text"].notnull() & (df["text"].str.strip() != "")] | |
| return df_filtered | |
| def _detect_language(self, ifilename: str): | |
| r""" | |
| Run tesseract in PSM 0 mode to detect the language | |
| """ | |
| assert self._tesseract_languages is not None | |
| cmd = [self.options.tesseract_cmd] | |
| cmd.extend(["--psm", "0", "-l", "osd", ifilename, "stdout"]) | |
| _log.info("command: {}".format(" ".join(cmd))) | |
| proc = Popen(cmd, stdout=PIPE, stderr=DEVNULL) | |
| output, _ = proc.communicate() | |
| decoded_data = output.decode("utf-8") | |
| df = pd.read_csv( | |
| io.StringIO(decoded_data), sep=":", header=None, names=["key", "value"] | |
| ) | |
| scripts = df.loc[df["key"] == "Script"].value.tolist() | |
| if len(scripts) == 0: | |
| _log.warning("Tesseract cannot detect the script of the page") | |
| return None | |
| script = map_tesseract_script(scripts[0].strip()) | |
| lang = f"{self._script_prefix}{script}" | |
| # Check if the detected language has been installed | |
| if lang not in self._tesseract_languages: | |
| msg = f"Tesseract detected the script '{script}' and language '{lang}'." | |
| msg += " However this language is not installed in your system and will be ignored." | |
| _log.warning(msg) | |
| return None | |
| _log.debug( | |
| f"Using tesseract model for the detected script '{script}' and language '{lang}'" | |
| ) | |
| return lang | |
| def _set_languages_and_prefix(self): | |
| r""" | |
| Read and set the languages installed in tesseract and decide the script prefix | |
| """ | |
| # Get all languages | |
| cmd = [self.options.tesseract_cmd] | |
| cmd.append("--list-langs") | |
| _log.info("command: {}".format(" ".join(cmd))) | |
| proc = Popen(cmd, stdout=PIPE, stderr=DEVNULL) | |
| output, _ = proc.communicate() | |
| decoded_data = output.decode("utf-8") | |
| df = pd.read_csv(io.StringIO(decoded_data), header=None) | |
| self._tesseract_languages = df[0].tolist()[1:] | |
| # Decide the script prefix | |
| if any([l.startswith("script/") for l in self._tesseract_languages]): | |
| script_prefix = "script/" | |
| else: | |
| script_prefix = "" | |
| self._script_prefix = script_prefix | |
| def __call__( | |
| self, conv_res: ConversionResult, page_batch: Iterable[Page] | |
| ) -> Iterable[Page]: | |
| if not self.enabled: | |
| yield from page_batch | |
| return | |
| for page in page_batch: | |
| assert page._backend is not None | |
| if not page._backend.is_valid(): | |
| yield page | |
| else: | |
| with TimeRecorder(conv_res, "ocr"): | |
| ocr_rects = self.get_ocr_rects(page) | |
| all_ocr_cells = [] | |
| for ocr_rect in ocr_rects: | |
| # Skip zero area boxes | |
| if ocr_rect.area() == 0: | |
| continue | |
| high_res_image = page._backend.get_page_image( | |
| scale=self.scale, cropbox=ocr_rect | |
| ) | |
| try: | |
| with tempfile.NamedTemporaryFile( | |
| suffix=".png", mode="w+b", delete=False | |
| ) as image_file: | |
| fname = image_file.name | |
| high_res_image.save(image_file) | |
| df = self._run_tesseract(fname) | |
| finally: | |
| if os.path.exists(fname): | |
| os.remove(fname) | |
| # _log.info(df) | |
| # Print relevant columns (bounding box and text) | |
| for ix, row in df.iterrows(): | |
| text = row["text"] | |
| conf = row["conf"] | |
| l = float(row["left"]) | |
| b = float(row["top"]) | |
| w = float(row["width"]) | |
| h = float(row["height"]) | |
| t = b + h | |
| r = l + w | |
| cell = OcrCell( | |
| id=ix, | |
| text=text, | |
| confidence=conf / 100.0, | |
| bbox=BoundingBox.from_tuple( | |
| coord=( | |
| (l / self.scale) + ocr_rect.l, | |
| (b / self.scale) + ocr_rect.t, | |
| (r / self.scale) + ocr_rect.l, | |
| (t / self.scale) + ocr_rect.t, | |
| ), | |
| origin=CoordOrigin.TOPLEFT, | |
| ), | |
| ) | |
| all_ocr_cells.append(cell) | |
| # Post-process the cells | |
| page.cells = self.post_process_cells(all_ocr_cells, page.cells) | |
| # DEBUG code: | |
| if settings.debug.visualize_ocr: | |
| self.draw_ocr_rects_and_cells(conv_res, page, ocr_rects) | |
| yield page | |