|
|
"""Backend to parse patents from the United States Patent Office (USPTO). |
|
|
|
|
|
The parsers included in this module can handle patent grants pubished since 1976 and |
|
|
patent applications since 2001. |
|
|
The original files can be found in https://bulkdata.uspto.gov. |
|
|
""" |
|
|
|
|
|
import html |
|
|
import logging |
|
|
import re |
|
|
import xml.sax |
|
|
import xml.sax.xmlreader |
|
|
from abc import ABC, abstractmethod |
|
|
from enum import Enum, unique |
|
|
from io import BytesIO |
|
|
from pathlib import Path |
|
|
from typing import Any, Final, Optional, Union |
|
|
|
|
|
from bs4 import BeautifulSoup, Tag |
|
|
from docling_core.types.doc import ( |
|
|
DocItem, |
|
|
DocItemLabel, |
|
|
DoclingDocument, |
|
|
DocumentOrigin, |
|
|
TableCell, |
|
|
TableData, |
|
|
TextItem, |
|
|
) |
|
|
from docling_core.types.doc.document import LevelNumber |
|
|
from pydantic import NonNegativeInt |
|
|
from typing_extensions import Self, TypedDict, override |
|
|
|
|
|
from docling.backend.abstract_backend import DeclarativeDocumentBackend |
|
|
from docling.datamodel.base_models import InputFormat |
|
|
from docling.datamodel.document import InputDocument |
|
|
|
|
|
_log = logging.getLogger(__name__) |
|
|
|
|
|
XML_DECLARATION: Final = '<?xml version="1.0" encoding="UTF-8"?>' |
|
|
|
|
|
|
|
|
@unique |
|
|
class PatentHeading(Enum): |
|
|
"""Text of docling headings for tagged sections in USPTO patent documents.""" |
|
|
|
|
|
ABSTRACT = "ABSTRACT", 2 |
|
|
CLAIMS = "CLAIMS", 2 |
|
|
|
|
|
@override |
|
|
def __new__(cls, value: str, _) -> Self: |
|
|
obj = object.__new__(cls) |
|
|
obj._value_ = value |
|
|
return obj |
|
|
|
|
|
@override |
|
|
def __init__(self, _, level: LevelNumber) -> None: |
|
|
self.level: LevelNumber = level |
|
|
|
|
|
|
|
|
class PatentUsptoDocumentBackend(DeclarativeDocumentBackend): |
|
|
@override |
|
|
def __init__( |
|
|
self, in_doc: InputDocument, path_or_stream: Union[BytesIO, Path] |
|
|
) -> None: |
|
|
super().__init__(in_doc, path_or_stream) |
|
|
|
|
|
self.patent_content: str = "" |
|
|
self.parser: Optional[PatentUspto] = None |
|
|
|
|
|
try: |
|
|
if isinstance(self.path_or_stream, BytesIO): |
|
|
while line := self.path_or_stream.readline().decode("utf-8"): |
|
|
if line.startswith("<!DOCTYPE") or line == "PATN\n": |
|
|
self._set_parser(line) |
|
|
self.patent_content += line |
|
|
elif isinstance(self.path_or_stream, Path): |
|
|
with open(self.path_or_stream, encoding="utf-8") as file_obj: |
|
|
while line := file_obj.readline(): |
|
|
if line.startswith("<!DOCTYPE") or line == "PATN\n": |
|
|
self._set_parser(line) |
|
|
self.patent_content += line |
|
|
except Exception as exc: |
|
|
raise RuntimeError( |
|
|
f"Could not initialize USPTO backend for file with hash {self.document_hash}." |
|
|
) from exc |
|
|
|
|
|
def _set_parser(self, doctype: str) -> None: |
|
|
doctype_line = doctype.lower() |
|
|
if doctype == "PATN\n": |
|
|
self.parser = PatentUsptoGrantAps() |
|
|
elif "us-patent-application-v4" in doctype_line: |
|
|
self.parser = PatentUsptoIce() |
|
|
elif "us-patent-grant-v4" in doctype_line: |
|
|
self.parser = PatentUsptoIce() |
|
|
elif "us-grant-025" in doctype_line: |
|
|
self.parser = PatentUsptoGrantV2() |
|
|
elif all( |
|
|
item in doctype_line |
|
|
for item in ("patent-application-publication", "pap-v1") |
|
|
): |
|
|
self.parser = PatentUsptoAppV1() |
|
|
else: |
|
|
self.parser = None |
|
|
|
|
|
@override |
|
|
def is_valid(self) -> bool: |
|
|
return bool(self.patent_content) and bool(self.parser) |
|
|
|
|
|
@classmethod |
|
|
@override |
|
|
def supports_pagination(cls) -> bool: |
|
|
return False |
|
|
|
|
|
@override |
|
|
def unload(self) -> None: |
|
|
return |
|
|
|
|
|
@classmethod |
|
|
@override |
|
|
def supported_formats(cls) -> set[InputFormat]: |
|
|
return {InputFormat.XML_USPTO} |
|
|
|
|
|
@override |
|
|
def convert(self) -> DoclingDocument: |
|
|
|
|
|
if self.parser is not None: |
|
|
doc = self.parser.parse(self.patent_content) |
|
|
if doc is None: |
|
|
raise RuntimeError( |
|
|
f"Failed to convert doc (hash={self.document_hash}, " |
|
|
f"name={self.file.name})." |
|
|
) |
|
|
doc.name = self.file.name or "file" |
|
|
mime_type = ( |
|
|
"text/plain" |
|
|
if isinstance(self.parser, PatentUsptoGrantAps) |
|
|
else "application/xml" |
|
|
) |
|
|
doc.origin = DocumentOrigin( |
|
|
mimetype=mime_type, |
|
|
binary_hash=self.document_hash, |
|
|
filename=self.file.name or "file", |
|
|
) |
|
|
|
|
|
return doc |
|
|
else: |
|
|
raise RuntimeError( |
|
|
f"Cannot convert doc (hash={self.document_hash}, " |
|
|
f"name={self.file.name}) because the backend failed to init." |
|
|
) |
|
|
|
|
|
|
|
|
class PatentUspto(ABC): |
|
|
"""Parser of patent documents from the US Patent Office.""" |
|
|
|
|
|
@abstractmethod |
|
|
def parse(self, patent_content: str) -> Optional[DoclingDocument]: |
|
|
"""Parse a USPTO patent. |
|
|
|
|
|
Parameters: |
|
|
patent_content: The content of a single patent in a USPTO file. |
|
|
|
|
|
Returns: |
|
|
The patent parsed as a docling document. |
|
|
""" |
|
|
pass |
|
|
|
|
|
|
|
|
class PatentUsptoIce(PatentUspto): |
|
|
"""Parser of patent documents from the US Patent Office (ICE). |
|
|
|
|
|
The compatible formats are: |
|
|
- Patent Grant Full Text Data/XML Version 4.x ICE (from January 2005) |
|
|
- Patent Application Full Text Data/XML Version 4.x ICE (from January 2005) |
|
|
""" |
|
|
|
|
|
def __init__(self) -> None: |
|
|
"""Build an instance of PatentUsptoIce class.""" |
|
|
self.handler = PatentUsptoIce.PatentHandler() |
|
|
self.pattern = re.compile(r"^(<table .*?</table>)", re.MULTILINE | re.DOTALL) |
|
|
|
|
|
def parse(self, patent_content: str) -> Optional[DoclingDocument]: |
|
|
try: |
|
|
xml.sax.parseString(patent_content, self.handler) |
|
|
except xml.sax._exceptions.SAXParseException as exc_sax: |
|
|
_log.error(f"Error in parsing USPTO document: {exc_sax}") |
|
|
|
|
|
return None |
|
|
|
|
|
doc = self.handler.doc |
|
|
if doc: |
|
|
raw_tables = re.findall(self.pattern, patent_content) |
|
|
parsed_tables: list[TableData] = [] |
|
|
_log.debug(f"Found {len(raw_tables)} tables to be parsed with XmlTable.") |
|
|
for table in raw_tables: |
|
|
table_parser = XmlTable(XML_DECLARATION + "\n" + table) |
|
|
try: |
|
|
table_data = table_parser.parse() |
|
|
if table_data: |
|
|
parsed_tables.append(table_data) |
|
|
except Exception as exc_table: |
|
|
_log.error(f"Error in parsing USPTO tables: {exc_table}") |
|
|
if len(parsed_tables) != len(doc.tables): |
|
|
_log.error( |
|
|
f"Number of referenced ({len(doc.tables)}) and parsed " |
|
|
f"({len(parsed_tables)}) tables differ." |
|
|
) |
|
|
else: |
|
|
for idx, item in enumerate(parsed_tables): |
|
|
doc.tables[idx].data = item |
|
|
|
|
|
return doc |
|
|
|
|
|
class PatentHandler(xml.sax.handler.ContentHandler): |
|
|
"""SAX ContentHandler for patent documents.""" |
|
|
|
|
|
APP_DOC_ELEMENT: Final = "us-patent-application" |
|
|
GRANT_DOC_ELEMENT: Final = "us-patent-grant" |
|
|
|
|
|
@unique |
|
|
class Element(Enum): |
|
|
"""Represents an element of interest in the patent application document.""" |
|
|
|
|
|
ABSTRACT = "abstract", True |
|
|
TITLE = "invention-title", True |
|
|
CLAIMS = "claims", False |
|
|
CLAIM = "claim", False |
|
|
CLAIM_TEXT = "claim-text", True |
|
|
PARAGRAPH = "p", True |
|
|
HEADING = "heading", True |
|
|
DESCRIPTION = "description", False |
|
|
TABLE = "table", False |
|
|
DRAWINGS = "description-of-drawings", True |
|
|
STYLE_SUPERSCRIPT = "sup", True |
|
|
STYLE_SUBSCRIPT = "sub", True |
|
|
MATHS = "maths", False |
|
|
|
|
|
@override |
|
|
def __new__(cls, value: str, _) -> Self: |
|
|
obj = object.__new__(cls) |
|
|
obj._value_ = value |
|
|
return obj |
|
|
|
|
|
@override |
|
|
def __init__(self, _, is_text: bool) -> None: |
|
|
self.is_text: bool = is_text |
|
|
|
|
|
@override |
|
|
def __init__(self) -> None: |
|
|
"""Build an instance of the patent handler.""" |
|
|
|
|
|
self.doc: Optional[DoclingDocument] = None |
|
|
|
|
|
self.level: LevelNumber = 1 |
|
|
|
|
|
self.parents: dict[LevelNumber, Optional[DocItem]] = {1: None} |
|
|
|
|
|
self.property: list[str] |
|
|
self.claim: str |
|
|
self.claims: list[str] |
|
|
self.abstract: str |
|
|
self.text: str |
|
|
self._clean_data() |
|
|
|
|
|
self.style_html = HtmlEntity() |
|
|
|
|
|
@override |
|
|
def startElement(self, tag, attributes): |
|
|
"""Signal the start of an element. |
|
|
|
|
|
Args: |
|
|
tag: The element tag. |
|
|
attributes: The element attributes. |
|
|
""" |
|
|
if tag in ( |
|
|
self.APP_DOC_ELEMENT, |
|
|
self.GRANT_DOC_ELEMENT, |
|
|
): |
|
|
self.doc = DoclingDocument(name="file") |
|
|
self.text = "" |
|
|
self._start_registered_elements(tag, attributes) |
|
|
|
|
|
@override |
|
|
def skippedEntity(self, name): |
|
|
"""Receive notification of a skipped entity. |
|
|
|
|
|
HTML entities will be skipped by the parser. This method will unescape them |
|
|
and add them to the text. |
|
|
|
|
|
Args: |
|
|
name: Entity name. |
|
|
""" |
|
|
if self.property: |
|
|
elm_val = self.property[-1] |
|
|
element = self.Element(elm_val) |
|
|
if element.is_text: |
|
|
escaped = self.style_html.get_greek_from_iso8879(f"&{name};") |
|
|
unescaped = html.unescape(escaped) |
|
|
if unescaped == escaped: |
|
|
_log.debug(f"Unrecognized HTML entity: {name}") |
|
|
return |
|
|
|
|
|
if element in ( |
|
|
self.Element.STYLE_SUPERSCRIPT, |
|
|
self.Element.STYLE_SUBSCRIPT, |
|
|
): |
|
|
|
|
|
if len(self.property) < 2: |
|
|
return |
|
|
parent_val = self.property[-2] |
|
|
parent = self.Element(parent_val) |
|
|
if parent.is_text: |
|
|
self.text += self._apply_style(unescaped, elm_val) |
|
|
else: |
|
|
self.text += unescaped |
|
|
|
|
|
@override |
|
|
def endElement(self, tag): |
|
|
"""Signal the end of an element. |
|
|
|
|
|
Args: |
|
|
tag: The element tag. |
|
|
""" |
|
|
if tag in ( |
|
|
self.APP_DOC_ELEMENT, |
|
|
self.GRANT_DOC_ELEMENT, |
|
|
): |
|
|
self._clean_data() |
|
|
self._end_registered_element(tag) |
|
|
|
|
|
@override |
|
|
def characters(self, content): |
|
|
"""Receive notification of character data. |
|
|
|
|
|
Args: |
|
|
content: Data reported by the handler. |
|
|
""" |
|
|
if self.property: |
|
|
elm_val = self.property[-1] |
|
|
element = self.Element(elm_val) |
|
|
if element.is_text: |
|
|
if element in ( |
|
|
self.Element.STYLE_SUPERSCRIPT, |
|
|
self.Element.STYLE_SUBSCRIPT, |
|
|
): |
|
|
|
|
|
if len(self.property) < 2: |
|
|
return |
|
|
parent_val = self.property[-2] |
|
|
parent = self.Element(parent_val) |
|
|
if parent.is_text: |
|
|
self.text += self._apply_style(content, elm_val) |
|
|
else: |
|
|
self.text += content |
|
|
|
|
|
def _start_registered_elements( |
|
|
self, tag: str, attributes: xml.sax.xmlreader.AttributesImpl |
|
|
) -> None: |
|
|
if tag in [member.value for member in self.Element]: |
|
|
|
|
|
|
|
|
if ( |
|
|
tag == self.Element.CLAIM_TEXT.value |
|
|
and self.property |
|
|
and self.property[-1] == tag |
|
|
and self.text.strip() |
|
|
): |
|
|
self.claim += " " + self.text.strip() |
|
|
self.text = "" |
|
|
elif tag == self.Element.HEADING.value: |
|
|
level_attr: str = attributes.get("level", "") |
|
|
new_level: int = int(level_attr) if level_attr.isnumeric() else 1 |
|
|
max_level = min(self.parents.keys()) |
|
|
|
|
|
self.level = ( |
|
|
new_level + 1 if (new_level + 1) in self.parents else max_level |
|
|
) |
|
|
self.property.append(tag) |
|
|
|
|
|
def _end_registered_element(self, tag: str) -> None: |
|
|
if tag in [item.value for item in self.Element] and self.property: |
|
|
current_tag = self.property.pop() |
|
|
self._add_property(current_tag, self.text.strip()) |
|
|
|
|
|
def _add_property(self, name: str, text: str) -> None: |
|
|
if not name or not self.doc: |
|
|
return |
|
|
|
|
|
if name == self.Element.TITLE.value: |
|
|
if text: |
|
|
self.parents[self.level + 1] = self.doc.add_title( |
|
|
parent=self.parents[self.level], |
|
|
text=text, |
|
|
) |
|
|
self.level += 1 |
|
|
self.text = "" |
|
|
|
|
|
elif name == self.Element.ABSTRACT.value: |
|
|
if self.abstract: |
|
|
heading_text = PatentHeading.ABSTRACT.value |
|
|
heading_level = ( |
|
|
PatentHeading.ABSTRACT.level |
|
|
if PatentHeading.ABSTRACT.level in self.parents |
|
|
else 1 |
|
|
) |
|
|
abstract_item = self.doc.add_heading( |
|
|
heading_text, |
|
|
level=heading_level, |
|
|
parent=self.parents[heading_level], |
|
|
) |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, |
|
|
text=self.abstract, |
|
|
parent=abstract_item, |
|
|
) |
|
|
|
|
|
elif name == self.Element.CLAIM_TEXT.value: |
|
|
text = re.sub("\\s+", " ", text).strip() |
|
|
if text: |
|
|
self.claim += " " + text |
|
|
self.text = "" |
|
|
|
|
|
elif name == self.Element.CLAIM.value and self.claim: |
|
|
self.claims.append(self.claim.strip()) |
|
|
self.claim = "" |
|
|
|
|
|
elif name == self.Element.CLAIMS.value and self.claims: |
|
|
heading_text = PatentHeading.CLAIMS.value |
|
|
heading_level = ( |
|
|
PatentHeading.CLAIMS.level |
|
|
if PatentHeading.CLAIMS.level in self.parents |
|
|
else 1 |
|
|
) |
|
|
claims_item = self.doc.add_heading( |
|
|
heading_text, |
|
|
level=heading_level, |
|
|
parent=self.parents[heading_level], |
|
|
) |
|
|
for text in self.claims: |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, text=text, parent=claims_item |
|
|
) |
|
|
|
|
|
elif name == self.Element.PARAGRAPH.value and text: |
|
|
|
|
|
text = re.sub("\\s+", " ", text) |
|
|
if self.Element.ABSTRACT.value in self.property: |
|
|
self.abstract = ( |
|
|
(self.abstract + " " + text) if self.abstract else text |
|
|
) |
|
|
else: |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, |
|
|
text=text, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
self.text = "" |
|
|
|
|
|
elif name == self.Element.HEADING.value and text: |
|
|
self.parents[self.level + 1] = self.doc.add_heading( |
|
|
text=text, |
|
|
level=self.level, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
self.level += 1 |
|
|
self.text = "" |
|
|
|
|
|
elif name == self.Element.TABLE.value: |
|
|
|
|
|
empty_table = TableData(num_rows=0, num_cols=0, table_cells=[]) |
|
|
self.doc.add_table( |
|
|
data=empty_table, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
|
|
|
def _apply_style(self, text: str, style_tag: str) -> str: |
|
|
"""Apply an HTML style to text. |
|
|
|
|
|
Args: |
|
|
text: A string containing plain text. |
|
|
style_tag: An HTML tag name for styling text. If the tag name is not |
|
|
recognized as one of the supported styles, the method will return |
|
|
the original `text`. |
|
|
|
|
|
Returns: |
|
|
A string after applying the style. |
|
|
""" |
|
|
formatted = text |
|
|
|
|
|
if style_tag == self.Element.STYLE_SUPERSCRIPT.value: |
|
|
formatted = html.unescape(self.style_html.get_superscript(text)) |
|
|
elif style_tag == self.Element.STYLE_SUBSCRIPT.value: |
|
|
formatted = html.unescape(self.style_html.get_subscript(text)) |
|
|
|
|
|
return formatted |
|
|
|
|
|
def _clean_data(self) -> None: |
|
|
"""Reset the variables from stream data.""" |
|
|
self.property = [] |
|
|
self.claim = "" |
|
|
self.claims = [] |
|
|
self.abstract = "" |
|
|
|
|
|
|
|
|
class PatentUsptoGrantV2(PatentUspto): |
|
|
"""Parser of patent documents from the US Patent Office (grants v2.5). |
|
|
|
|
|
The compatible format is: |
|
|
- Patent Grant Full Text Data/XML Version 2.5 (from January 2002 till December 2004) |
|
|
""" |
|
|
|
|
|
@override |
|
|
def __init__(self) -> None: |
|
|
"""Build an instance of PatentUsptoGrantV2 class.""" |
|
|
self.handler = PatentUsptoGrantV2.PatentHandler() |
|
|
self.pattern = re.compile(r"^(<table .*?</table>)", re.MULTILINE | re.DOTALL) |
|
|
|
|
|
@override |
|
|
def parse(self, patent_content: str) -> Optional[DoclingDocument]: |
|
|
try: |
|
|
xml.sax.parseString(patent_content, self.handler) |
|
|
except xml.sax._exceptions.SAXParseException as exc_sax: |
|
|
_log.error(f"Error in parsing USPTO document: {exc_sax}") |
|
|
|
|
|
return None |
|
|
|
|
|
doc = self.handler.doc |
|
|
if doc: |
|
|
raw_tables = re.findall(self.pattern, patent_content) |
|
|
parsed_tables: list[TableData] = [] |
|
|
_log.debug(f"Found {len(raw_tables)} tables to be parsed with XmlTable.") |
|
|
for table in raw_tables: |
|
|
table_parser = XmlTable(XML_DECLARATION + "\n" + table) |
|
|
try: |
|
|
table_data = table_parser.parse() |
|
|
if table_data: |
|
|
parsed_tables.append(table_data) |
|
|
except Exception as exc_table: |
|
|
_log.error(f"Error in parsing USPTO tables: {exc_table}") |
|
|
if len(parsed_tables) != len(doc.tables): |
|
|
_log.error( |
|
|
f"Number of referenced ({len(doc.tables)}) and parsed " |
|
|
f"({len(parsed_tables)}) tables differ." |
|
|
) |
|
|
else: |
|
|
for idx, item in enumerate(parsed_tables): |
|
|
doc.tables[idx].data = item |
|
|
|
|
|
return doc |
|
|
|
|
|
class PatentHandler(xml.sax.handler.ContentHandler): |
|
|
"""SAX ContentHandler for patent documents.""" |
|
|
|
|
|
GRANT_DOC_ELEMENT: Final = "PATDOC" |
|
|
CLAIM_STATEMENT: Final = "What is claimed is:" |
|
|
|
|
|
@unique |
|
|
class Element(Enum): |
|
|
"""Represents an element of interest in the patent application document.""" |
|
|
|
|
|
PDAT = "PDAT", True |
|
|
ABSTRACT = ("SDOAB", False) |
|
|
SDOCL = ("SDOCL", False) |
|
|
TITLE = ("B540", False) |
|
|
CLAIMS = ("CL", False) |
|
|
CLAIM = ("CLM", False) |
|
|
PARAGRAPH = ("PARA", True) |
|
|
HEADING = ("H", True) |
|
|
DRAWINGS = ("DRWDESC", False) |
|
|
STYLE_SUPERSCRIPT = ("SP", False) |
|
|
STYLE_SUBSCRIPT = ("SB", False) |
|
|
STYLE_ITALIC = ("ITALIC", False) |
|
|
CWU = ("CWU", False) |
|
|
TABLE = ("table", False) |
|
|
|
|
|
@override |
|
|
def __new__(cls, value: str, _) -> Self: |
|
|
obj = object.__new__(cls) |
|
|
obj._value_ = value |
|
|
return obj |
|
|
|
|
|
@override |
|
|
def __init__(self, _, is_text: bool) -> None: |
|
|
self.is_text: bool = is_text |
|
|
|
|
|
@override |
|
|
def __init__(self) -> None: |
|
|
"""Build an instance of the patent handler.""" |
|
|
|
|
|
self.doc: Optional[DoclingDocument] = None |
|
|
|
|
|
self.level: LevelNumber = 1 |
|
|
|
|
|
self.parents: dict[LevelNumber, Optional[DocItem]] = {1: None} |
|
|
|
|
|
self.property: list[str] |
|
|
self.claim: str |
|
|
self.claims: list[str] |
|
|
self.paragraph: str |
|
|
self.abstract: str |
|
|
self._clean_data() |
|
|
|
|
|
self.style_html = HtmlEntity() |
|
|
|
|
|
@override |
|
|
def startElement(self, tag, attributes): |
|
|
"""Signal the start of an element. |
|
|
|
|
|
Args: |
|
|
tag: The element tag. |
|
|
attributes: The element attributes. |
|
|
""" |
|
|
if tag == self.GRANT_DOC_ELEMENT: |
|
|
self.doc = DoclingDocument(name="file") |
|
|
self.text = "" |
|
|
self._start_registered_elements(tag, attributes) |
|
|
|
|
|
@override |
|
|
def skippedEntity(self, name): |
|
|
"""Receive notification of a skipped entity. |
|
|
|
|
|
HTML entities will be skipped by the parser. This method will unescape them |
|
|
and add them to the text. |
|
|
|
|
|
Args: |
|
|
name: Entity name. |
|
|
""" |
|
|
if self.property: |
|
|
elm_val = self.property[-1] |
|
|
element = self.Element(elm_val) |
|
|
if element.is_text: |
|
|
escaped = self.style_html.get_greek_from_iso8879(f"&{name};") |
|
|
unescaped = html.unescape(escaped) |
|
|
if unescaped == escaped: |
|
|
logging.debug("Unrecognized HTML entity: " + name) |
|
|
return |
|
|
|
|
|
if element in ( |
|
|
self.Element.STYLE_SUPERSCRIPT, |
|
|
self.Element.STYLE_SUBSCRIPT, |
|
|
): |
|
|
|
|
|
if len(self.property) < 2: |
|
|
return |
|
|
parent_val = self.property[-2] |
|
|
parent = self.Element(parent_val) |
|
|
if parent.is_text: |
|
|
self.text += self._apply_style(unescaped, elm_val) |
|
|
else: |
|
|
self.text += unescaped |
|
|
|
|
|
@override |
|
|
def endElement(self, tag): |
|
|
"""Signal the end of an element. |
|
|
|
|
|
Args: |
|
|
tag: The element tag. |
|
|
""" |
|
|
if tag == self.GRANT_DOC_ELEMENT: |
|
|
self._clean_data() |
|
|
self._end_registered_element(tag) |
|
|
|
|
|
@override |
|
|
def characters(self, content): |
|
|
"""Receive notification of character data. |
|
|
|
|
|
Args: |
|
|
content: Data reported by the handler. |
|
|
""" |
|
|
if self.property: |
|
|
elm_val = self.property[-1] |
|
|
element = self.Element(elm_val) |
|
|
if element.is_text: |
|
|
if element in ( |
|
|
self.Element.STYLE_SUPERSCRIPT, |
|
|
self.Element.STYLE_SUBSCRIPT, |
|
|
): |
|
|
|
|
|
if len(self.property) < 2: |
|
|
return |
|
|
parent_val = self.property[-2] |
|
|
parent = self.Element(parent_val) |
|
|
if parent.is_text: |
|
|
self.text += self._apply_style(content, elm_val) |
|
|
else: |
|
|
self.text += content |
|
|
|
|
|
def _start_registered_elements( |
|
|
self, tag: str, attributes: xml.sax.xmlreader.AttributesImpl |
|
|
) -> None: |
|
|
if tag in [member.value for member in self.Element]: |
|
|
if ( |
|
|
tag == self.Element.HEADING.value |
|
|
and not self.Element.SDOCL.value in self.property |
|
|
): |
|
|
level_attr: str = attributes.get("LVL", "") |
|
|
new_level: int = int(level_attr) if level_attr.isnumeric() else 1 |
|
|
max_level = min(self.parents.keys()) |
|
|
|
|
|
self.level = ( |
|
|
new_level + 1 if (new_level + 1) in self.parents else max_level |
|
|
) |
|
|
self.property.append(tag) |
|
|
|
|
|
def _end_registered_element(self, tag: str) -> None: |
|
|
if tag in [elm.value for elm in self.Element] and self.property: |
|
|
current_tag = self.property.pop() |
|
|
self._add_property(current_tag, self.text) |
|
|
|
|
|
def _add_property(self, name: str, text: str) -> None: |
|
|
if not name or not self.doc: |
|
|
return |
|
|
if name == self.Element.PDAT.value and text: |
|
|
if not self.property: |
|
|
self.text = "" |
|
|
return |
|
|
|
|
|
wrapper = self.property[-1] |
|
|
text = self._apply_style(text, wrapper) |
|
|
|
|
|
if self.Element.TITLE.value in self.property and text.strip(): |
|
|
title = text.strip() |
|
|
self.parents[self.level + 1] = self.doc.add_title( |
|
|
parent=self.parents[self.level], |
|
|
text=title, |
|
|
) |
|
|
self.level += 1 |
|
|
|
|
|
elif self.Element.ABSTRACT.value in self.property: |
|
|
self.abstract += text |
|
|
|
|
|
elif self.Element.CLAIM.value in self.property: |
|
|
self.claim += text |
|
|
|
|
|
|
|
|
elif ( |
|
|
self.Element.PARAGRAPH.value in self.property |
|
|
and self.Element.CLAIM.value not in self.property |
|
|
and self.Element.ABSTRACT.value not in self.property |
|
|
): |
|
|
self.paragraph += text |
|
|
|
|
|
|
|
|
elif ( |
|
|
self.Element.HEADING.value in self.property |
|
|
and not self.Element.SDOCL.value in self.property |
|
|
and text.strip() |
|
|
): |
|
|
self.parents[self.level + 1] = self.doc.add_heading( |
|
|
text=text.strip(), |
|
|
level=self.level, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
self.level += 1 |
|
|
|
|
|
self.text = "" |
|
|
|
|
|
elif name == self.Element.CLAIM.value and self.claim.strip(): |
|
|
self.claims.append(self.claim.strip()) |
|
|
self.claim = "" |
|
|
|
|
|
elif name == self.Element.CLAIMS.value and self.claims: |
|
|
heading_text = PatentHeading.CLAIMS.value |
|
|
heading_level = ( |
|
|
PatentHeading.CLAIMS.level |
|
|
if PatentHeading.CLAIMS.level in self.parents |
|
|
else 1 |
|
|
) |
|
|
claims_item = self.doc.add_heading( |
|
|
heading_text, |
|
|
level=heading_level, |
|
|
parent=self.parents[heading_level], |
|
|
) |
|
|
for text in self.claims: |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, text=text, parent=claims_item |
|
|
) |
|
|
|
|
|
elif name == self.Element.ABSTRACT.value and self.abstract.strip(): |
|
|
abstract = self.abstract.strip() |
|
|
heading_text = PatentHeading.ABSTRACT.value |
|
|
heading_level = ( |
|
|
PatentHeading.ABSTRACT.level |
|
|
if PatentHeading.ABSTRACT.level in self.parents |
|
|
else 1 |
|
|
) |
|
|
abstract_item = self.doc.add_heading( |
|
|
heading_text, |
|
|
level=heading_level, |
|
|
parent=self.parents[heading_level], |
|
|
) |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, text=abstract, parent=abstract_item |
|
|
) |
|
|
|
|
|
elif name == self.Element.PARAGRAPH.value: |
|
|
paragraph = self.paragraph.strip() |
|
|
if paragraph and self.Element.CLAIM.value not in self.property: |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, |
|
|
text=paragraph, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
elif self.Element.CLAIM.value in self.property: |
|
|
|
|
|
self.claim += " " |
|
|
self.paragraph = "" |
|
|
|
|
|
elif name == self.Element.TABLE.value: |
|
|
|
|
|
empty_table = TableData(num_rows=0, num_cols=0, table_cells=[]) |
|
|
self.doc.add_table( |
|
|
data=empty_table, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
|
|
|
def _apply_style(self, text: str, style_tag: str) -> str: |
|
|
"""Apply an HTML style to text. |
|
|
|
|
|
Args: |
|
|
text: A string containing plain text. |
|
|
style_tag: An HTML tag name for styling text. If the tag name is not |
|
|
recognized as one of the supported styles, the method will return |
|
|
the original `text`. |
|
|
|
|
|
Returns: |
|
|
A string after applying the style. |
|
|
""" |
|
|
formatted = text |
|
|
|
|
|
if style_tag == self.Element.STYLE_SUPERSCRIPT.value: |
|
|
formatted = html.unescape(self.style_html.get_superscript(text)) |
|
|
elif style_tag == self.Element.STYLE_SUBSCRIPT.value: |
|
|
formatted = html.unescape(self.style_html.get_subscript(text)) |
|
|
elif style_tag == self.Element.STYLE_ITALIC.value: |
|
|
formatted = html.unescape(self.style_html.get_math_italic(text)) |
|
|
|
|
|
return formatted |
|
|
|
|
|
def _clean_data(self) -> None: |
|
|
"""Reset the variables from stream data.""" |
|
|
self.text = "" |
|
|
self.property = [] |
|
|
self.claim = "" |
|
|
self.claims = [] |
|
|
self.paragraph = "" |
|
|
self.abstract = "" |
|
|
|
|
|
|
|
|
class PatentUsptoGrantAps(PatentUspto): |
|
|
"""Parser of patents documents from the US Patent Office (grants APS). |
|
|
|
|
|
The compatible format is: |
|
|
- Patent Grant Full Text Data/APS (from January 1976 till December 2001) |
|
|
""" |
|
|
|
|
|
@unique |
|
|
class Section(Enum): |
|
|
"""Represent a section in a patent APS document.""" |
|
|
|
|
|
ABSTRACT = "ABST" |
|
|
SUMMARY = "BSUM" |
|
|
DETAILS = "DETD" |
|
|
CLAIMS = "CLMS" |
|
|
DRAWINGS = "DRWD" |
|
|
|
|
|
@unique |
|
|
class Field(Enum): |
|
|
"""Represent a field in a patent APS document.""" |
|
|
|
|
|
DOC_NUMBER = "WKU" |
|
|
TITLE = "TTL" |
|
|
PARAGRAPH = "PAR" |
|
|
PARAGRAPH_1 = "PA1" |
|
|
PARAGRAPH_2 = "PA2" |
|
|
PARAGRAPH_3 = "PA3" |
|
|
TEXT = "PAL" |
|
|
CAPTION = "PAC" |
|
|
NUMBER = "NUM" |
|
|
NAME = "NAM" |
|
|
IPC = "ICL" |
|
|
ISSUED = "ISD" |
|
|
FILED = "APD" |
|
|
PATENT_NUMBER = "PNO" |
|
|
APPLICATION_NUMBER = "APN" |
|
|
APPLICATION_TYPE = "APT" |
|
|
COUNTRY = "CNT" |
|
|
|
|
|
@override |
|
|
def __init__(self) -> None: |
|
|
"""Build an instance of PatentUsptoGrantAps class.""" |
|
|
self.doc: Optional[DoclingDocument] = None |
|
|
|
|
|
self.level: LevelNumber = 1 |
|
|
|
|
|
self.parents: dict[LevelNumber, Optional[DocItem]] = {1: None} |
|
|
|
|
|
def get_last_text_item(self) -> Optional[TextItem]: |
|
|
"""Get the last text item at the current document level. |
|
|
|
|
|
Returns: |
|
|
The text item or None, if the current level parent has no children.""" |
|
|
if self.doc: |
|
|
parent = self.parents[self.level] |
|
|
children = parent.children if parent is not None else [] |
|
|
else: |
|
|
return None |
|
|
text_list: list[TextItem] = [ |
|
|
item |
|
|
for item in self.doc.texts |
|
|
if isinstance(item, TextItem) and item.get_ref() in children |
|
|
] |
|
|
|
|
|
if text_list: |
|
|
return text_list[-1] |
|
|
else: |
|
|
return None |
|
|
|
|
|
def store_section(self, section: str) -> None: |
|
|
"""Store the section heading in the docling document. |
|
|
|
|
|
Only the predefined sections from PatentHeading will be handled. |
|
|
The other sections are created by the Field.CAPTION field. |
|
|
|
|
|
Args: |
|
|
section: A patent section name.""" |
|
|
heading: PatentHeading |
|
|
if self.doc is None: |
|
|
return |
|
|
elif section == self.Section.ABSTRACT.value: |
|
|
heading = PatentHeading.ABSTRACT |
|
|
elif section == self.Section.CLAIMS.value: |
|
|
heading = PatentHeading.CLAIMS |
|
|
else: |
|
|
return None |
|
|
|
|
|
self.level = heading.level if heading.level in self.parents else 1 |
|
|
self.parents[self.level + 1] = self.doc.add_heading( |
|
|
heading.value, |
|
|
level=self.level, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
self.level += 1 |
|
|
|
|
|
def store_content(self, section: str, field: str, value: str) -> None: |
|
|
"""Store the key value within a document section in the docling document. |
|
|
|
|
|
Args: |
|
|
section: A patent section name. |
|
|
field: A field name. |
|
|
value: A field value name. |
|
|
""" |
|
|
if ( |
|
|
not self.doc |
|
|
or not field |
|
|
or field not in [item.value for item in PatentUsptoGrantAps.Field] |
|
|
): |
|
|
return |
|
|
|
|
|
if field == self.Field.TITLE.value: |
|
|
self.parents[self.level + 1] = self.doc.add_title( |
|
|
parent=self.parents[self.level], text=value |
|
|
) |
|
|
self.level += 1 |
|
|
|
|
|
elif field == self.Field.TEXT.value and section == self.Section.ABSTRACT.value: |
|
|
abst_item = self.get_last_text_item() |
|
|
if abst_item: |
|
|
abst_item.text += " " + value |
|
|
else: |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, |
|
|
text=value, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
|
|
|
elif field == self.Field.NUMBER.value and section == self.Section.CLAIMS.value: |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, |
|
|
text="", |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
|
|
|
elif ( |
|
|
field |
|
|
in ( |
|
|
self.Field.PARAGRAPH.value, |
|
|
self.Field.PARAGRAPH_1.value, |
|
|
self.Field.PARAGRAPH_2.value, |
|
|
self.Field.PARAGRAPH_3.value, |
|
|
) |
|
|
and section == self.Section.CLAIMS.value |
|
|
): |
|
|
last_claim = self.get_last_text_item() |
|
|
if last_claim is None: |
|
|
last_claim = self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, |
|
|
text="", |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
|
|
|
last_claim.text += f" {value}" if last_claim.text else value |
|
|
|
|
|
elif field == self.Field.CAPTION.value and section in ( |
|
|
self.Section.SUMMARY.value, |
|
|
self.Section.DETAILS.value, |
|
|
self.Section.DRAWINGS.value, |
|
|
): |
|
|
|
|
|
head_item = PatentHeading.ABSTRACT |
|
|
self.level = head_item.level if head_item.level in self.parents else 1 |
|
|
self.parents[self.level + 1] = self.doc.add_heading( |
|
|
value, |
|
|
level=self.level, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
self.level += 1 |
|
|
|
|
|
elif field in ( |
|
|
self.Field.PARAGRAPH.value, |
|
|
self.Field.PARAGRAPH_1.value, |
|
|
self.Field.PARAGRAPH_2.value, |
|
|
self.Field.PARAGRAPH_3.value, |
|
|
) and section in ( |
|
|
self.Section.SUMMARY.value, |
|
|
self.Section.DETAILS.value, |
|
|
self.Section.DRAWINGS.value, |
|
|
): |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, |
|
|
text=value, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
|
|
|
def parse(self, patent_content: str) -> Optional[DoclingDocument]: |
|
|
self.doc = self.doc = DoclingDocument(name="file") |
|
|
section: str = "" |
|
|
key: str = "" |
|
|
value: str = "" |
|
|
line_num = 0 |
|
|
for line in patent_content.splitlines(): |
|
|
cols = re.split("\\s{2,}", line, maxsplit=1) |
|
|
if key and value and (len(cols) == 1 or (len(cols) == 2 and cols[0])): |
|
|
self.store_content(section, key, value) |
|
|
key = "" |
|
|
value = "" |
|
|
if len(cols) == 1: |
|
|
section = cols[0] |
|
|
self.store_section(section) |
|
|
_log.debug(f"Parsing section {section}") |
|
|
elif len(cols) == 2: |
|
|
if cols[0]: |
|
|
key = cols[0] |
|
|
value = cols[1] |
|
|
elif not re.match(r"^##STR\d+##$", cols[1]): |
|
|
value += " " + cols[1] |
|
|
line_num += 1 |
|
|
if key and value: |
|
|
self.store_content(section, key, value) |
|
|
|
|
|
|
|
|
return self.doc |
|
|
|
|
|
|
|
|
class PatentUsptoAppV1(PatentUspto): |
|
|
"""Parser of patent documents from the US Patent Office (applications v1.x) |
|
|
|
|
|
The compatible format is: |
|
|
- Patent Application Full Text Data/XML Version 1.x (from March 2001 till December |
|
|
2004) |
|
|
""" |
|
|
|
|
|
@override |
|
|
def __init__(self) -> None: |
|
|
"""Build an instance of PatentUsptoAppV1 class.""" |
|
|
self.handler = PatentUsptoAppV1.PatentHandler() |
|
|
self.pattern = re.compile(r"^(<table .*?</table>)", re.MULTILINE | re.DOTALL) |
|
|
|
|
|
@override |
|
|
def parse(self, patent_content: str) -> Optional[DoclingDocument]: |
|
|
try: |
|
|
xml.sax.parseString(patent_content, self.handler) |
|
|
except xml.sax._exceptions.SAXParseException as exc_sax: |
|
|
_log.error(f"Error in parsing USPTO document: {exc_sax}") |
|
|
|
|
|
return None |
|
|
|
|
|
doc = self.handler.doc |
|
|
if doc: |
|
|
raw_tables = re.findall(self.pattern, patent_content) |
|
|
parsed_tables: list[TableData] = [] |
|
|
_log.debug(f"Found {len(raw_tables)} tables to be parsed with XmlTable.") |
|
|
for table in raw_tables: |
|
|
table_parser = XmlTable(XML_DECLARATION + "\n" + table) |
|
|
try: |
|
|
table_data = table_parser.parse() |
|
|
if table_data: |
|
|
parsed_tables.append(table_data) |
|
|
except Exception as exc_table: |
|
|
_log.error(f"Error in parsing USPTO tables: {exc_table}") |
|
|
if len(parsed_tables) != len(doc.tables): |
|
|
_log.error( |
|
|
f"Number of referenced ({len(doc.tables)}) and parsed " |
|
|
f"({len(parsed_tables)}) tables differ." |
|
|
) |
|
|
else: |
|
|
for idx, item in enumerate(parsed_tables): |
|
|
doc.tables[idx].data = item |
|
|
|
|
|
return doc |
|
|
|
|
|
class PatentHandler(xml.sax.handler.ContentHandler): |
|
|
"""SAX ContentHandler for patent documents.""" |
|
|
|
|
|
APP_DOC_ELEMENT: Final = "patent-application-publication" |
|
|
|
|
|
@unique |
|
|
class Element(Enum): |
|
|
"""Represents an element of interest in the patent application document.""" |
|
|
|
|
|
DRAWINGS = "brief-description-of-drawings", False |
|
|
ABSTRACT = "subdoc-abstract", False |
|
|
TITLE = "title-of-invention", True |
|
|
CLAIMS = "subdoc-claims", False |
|
|
CLAIM = "claim", False |
|
|
CLAIM_TEXT = "claim-text", True |
|
|
NUMBER = ("number", False) |
|
|
PARAGRAPH = "paragraph", True |
|
|
HEADING = "heading", True |
|
|
STYLE_SUPERSCRIPT = "superscript", True |
|
|
STYLE_SUBSCRIPT = "subscript", True |
|
|
|
|
|
TABLE = "table", False |
|
|
|
|
|
MATH = "math-cwu", False |
|
|
|
|
|
@override |
|
|
def __new__(cls, value: str, _) -> Self: |
|
|
obj = object.__new__(cls) |
|
|
obj._value_ = value |
|
|
return obj |
|
|
|
|
|
@override |
|
|
def __init__(self, _, is_text: bool) -> None: |
|
|
self.is_text: bool = is_text |
|
|
|
|
|
@override |
|
|
def __init__(self) -> None: |
|
|
"""Build an instance of the patent handler.""" |
|
|
|
|
|
self.doc: Optional[DoclingDocument] = None |
|
|
|
|
|
self.level: LevelNumber = 1 |
|
|
|
|
|
self.parents: dict[LevelNumber, Optional[DocItem]] = {1: None} |
|
|
|
|
|
self.property: list[str] |
|
|
self.claim: str |
|
|
self.claims: list[str] |
|
|
self.abstract: str |
|
|
self.text: str |
|
|
self._clean_data() |
|
|
|
|
|
self.style_html = HtmlEntity() |
|
|
|
|
|
@override |
|
|
def startElement(self, tag, attributes): |
|
|
"""Signal the start of an element. |
|
|
|
|
|
Args: |
|
|
tag: The element tag. |
|
|
attributes: The element attributes. |
|
|
""" |
|
|
if tag == self.APP_DOC_ELEMENT: |
|
|
self.doc = DoclingDocument(name="file") |
|
|
self.text = "" |
|
|
self._start_registered_elements(tag, attributes) |
|
|
|
|
|
@override |
|
|
def skippedEntity(self, name): |
|
|
"""Receive notification of a skipped entity. |
|
|
|
|
|
HTML entities will be skipped by the parser. This method will unescape them |
|
|
and add them to the text. |
|
|
|
|
|
Args: |
|
|
name: Entity name. |
|
|
""" |
|
|
if self.property: |
|
|
elm_val = self.property[-1] |
|
|
element = self.Element(elm_val) |
|
|
if element.is_text: |
|
|
escaped = self.style_html.get_greek_from_iso8879(f"&{name};") |
|
|
unescaped = html.unescape(escaped) |
|
|
if unescaped == escaped: |
|
|
logging.debug("Unrecognized HTML entity: " + name) |
|
|
return |
|
|
|
|
|
if element in ( |
|
|
self.Element.STYLE_SUPERSCRIPT, |
|
|
self.Element.STYLE_SUBSCRIPT, |
|
|
): |
|
|
|
|
|
if len(self.property) < 2: |
|
|
return |
|
|
parent_val = self.property[-2] |
|
|
parent = self.Element(parent_val) |
|
|
if parent.is_text: |
|
|
self.text += self._apply_style(unescaped, elm_val) |
|
|
else: |
|
|
self.text += unescaped |
|
|
|
|
|
@override |
|
|
def endElement(self, tag): |
|
|
"""Signal the end of an element. |
|
|
|
|
|
Args: |
|
|
tag: The element tag. |
|
|
""" |
|
|
if tag == self.APP_DOC_ELEMENT: |
|
|
self._clean_data() |
|
|
self._end_registered_element(tag) |
|
|
|
|
|
@override |
|
|
def characters(self, content): |
|
|
"""Receive notification of character data. |
|
|
|
|
|
Args: |
|
|
content: Data reported by the handler. |
|
|
""" |
|
|
if self.property: |
|
|
elm_val = self.property[-1] |
|
|
element = self.Element(elm_val) |
|
|
if element.is_text: |
|
|
if element in ( |
|
|
self.Element.STYLE_SUPERSCRIPT, |
|
|
self.Element.STYLE_SUBSCRIPT, |
|
|
): |
|
|
|
|
|
if len(self.property) < 2: |
|
|
return |
|
|
parent_val = self.property[-2] |
|
|
parent = self.Element(parent_val) |
|
|
if parent.is_text: |
|
|
self.text += self._apply_style(content, elm_val) |
|
|
else: |
|
|
self.text += content |
|
|
|
|
|
def _start_registered_elements( |
|
|
self, tag: str, attributes: xml.sax.xmlreader.AttributesImpl |
|
|
) -> None: |
|
|
if tag in [member.value for member in self.Element]: |
|
|
|
|
|
|
|
|
if ( |
|
|
tag == self.Element.CLAIM_TEXT.value |
|
|
and self.property |
|
|
and self.property[-1] == tag |
|
|
and self.text.strip() |
|
|
): |
|
|
self.claim += " " + self.text.strip("\n") |
|
|
self.text = "" |
|
|
elif tag == self.Element.HEADING.value: |
|
|
level_attr: str = attributes.get("lvl", "") |
|
|
new_level: int = int(level_attr) if level_attr.isnumeric() else 1 |
|
|
max_level = min(self.parents.keys()) |
|
|
|
|
|
self.level = ( |
|
|
new_level + 1 if (new_level + 1) in self.parents else max_level |
|
|
) |
|
|
self.property.append(tag) |
|
|
|
|
|
def _end_registered_element(self, tag: str) -> None: |
|
|
if tag in [elm.value for elm in self.Element] and self.property: |
|
|
current_tag = self.property.pop() |
|
|
self._add_property(current_tag, self.text) |
|
|
|
|
|
def _add_property(self, name: str, text: str) -> None: |
|
|
if not name or not self.doc: |
|
|
return |
|
|
|
|
|
if name == self.Element.TITLE.value: |
|
|
title = text.strip() |
|
|
if title: |
|
|
self.parents[self.level + 1] = self.doc.add_text( |
|
|
parent=self.parents[self.level], |
|
|
label=DocItemLabel.TITLE, |
|
|
text=title, |
|
|
) |
|
|
self.level += 1 |
|
|
self.text = "" |
|
|
elif name == self.Element.ABSTRACT.value: |
|
|
abstract = self.abstract.strip() |
|
|
if abstract: |
|
|
heading_text = PatentHeading.ABSTRACT.value |
|
|
heading_level = ( |
|
|
PatentHeading.ABSTRACT.level |
|
|
if PatentHeading.ABSTRACT.level in self.parents |
|
|
else 1 |
|
|
) |
|
|
abstract_item = self.doc.add_heading( |
|
|
heading_text, |
|
|
level=heading_level, |
|
|
parent=self.parents[heading_level], |
|
|
) |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, |
|
|
text=self.abstract, |
|
|
parent=abstract_item, |
|
|
) |
|
|
self.abstract = "" |
|
|
self.text = "" |
|
|
elif name == self.Element.CLAIM_TEXT.value: |
|
|
if text: |
|
|
self.claim += self.text.strip("\n") |
|
|
self.text = "" |
|
|
|
|
|
elif name == self.Element.CLAIM.value: |
|
|
claim = self.claim.strip() |
|
|
if claim: |
|
|
self.claims.append(claim) |
|
|
self.claim = "" |
|
|
|
|
|
elif name == self.Element.CLAIMS.value and self.claims: |
|
|
heading_text = PatentHeading.CLAIMS.value |
|
|
heading_level = ( |
|
|
PatentHeading.CLAIMS.level |
|
|
if PatentHeading.CLAIMS.level in self.parents |
|
|
else 1 |
|
|
) |
|
|
claims_item = self.doc.add_heading( |
|
|
heading_text, |
|
|
level=heading_level, |
|
|
parent=self.parents[heading_level], |
|
|
) |
|
|
for text in self.claims: |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, text=text, parent=claims_item |
|
|
) |
|
|
|
|
|
elif name in ( |
|
|
self.Element.PARAGRAPH.value, |
|
|
self.Element.HEADING.value, |
|
|
): |
|
|
if text and self.Element.ABSTRACT.value in self.property: |
|
|
self.abstract = (self.abstract + text) if self.abstract else text |
|
|
elif text.strip(): |
|
|
text = re.sub("\\s+", " ", text).strip() |
|
|
if name == self.Element.HEADING.value: |
|
|
self.parents[self.level + 1] = self.doc.add_heading( |
|
|
text=text, |
|
|
level=self.level, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
self.level += 1 |
|
|
else: |
|
|
self.doc.add_text( |
|
|
label=DocItemLabel.PARAGRAPH, |
|
|
text=text, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
self.text = "" |
|
|
|
|
|
elif name == self.Element.TABLE.value: |
|
|
|
|
|
empty_table = TableData(num_rows=0, num_cols=0, table_cells=[]) |
|
|
self.doc.add_table( |
|
|
data=empty_table, |
|
|
parent=self.parents[self.level], |
|
|
) |
|
|
|
|
|
def _apply_style(self, text: str, style_tag: str) -> str: |
|
|
"""Apply an HTML style to text. |
|
|
|
|
|
Args: |
|
|
text: A string containing plain text. |
|
|
style_tag: An HTML tag name for styling text. If the tag name is not |
|
|
recognized as one of the supported styles, the method will return |
|
|
the original `text`. |
|
|
|
|
|
Returns: |
|
|
A string after applying the style. |
|
|
""" |
|
|
formatted = html.unescape(text) |
|
|
|
|
|
if style_tag == self.Element.STYLE_SUPERSCRIPT.value: |
|
|
formatted = html.unescape(self.style_html.get_superscript(formatted)) |
|
|
elif style_tag == self.Element.STYLE_SUBSCRIPT.value: |
|
|
formatted = html.unescape(self.style_html.get_subscript(formatted)) |
|
|
|
|
|
return formatted |
|
|
|
|
|
def _clean_data(self): |
|
|
"""Reset the variables from stream data.""" |
|
|
self.property = [] |
|
|
self.abstract = "" |
|
|
self.claim = "" |
|
|
self.claims = [] |
|
|
self.text = "" |
|
|
|
|
|
|
|
|
class XmlTable: |
|
|
"""Provide a table parser for xml tables in USPTO patent documents. |
|
|
|
|
|
The OASIS Open XML Exchange Table Model can be downloaded from: |
|
|
http://oasis-open.org/specs/soextblx.dtd |
|
|
""" |
|
|
|
|
|
class MinColInfoType(TypedDict): |
|
|
offset: list[int] |
|
|
colwidth: list[int] |
|
|
|
|
|
class ColInfoType(MinColInfoType): |
|
|
cell_range: list[int] |
|
|
cell_offst: list[int] |
|
|
|
|
|
def __init__(self, input: str) -> None: |
|
|
"""Initialize the table parser with the xml content. |
|
|
|
|
|
Args: |
|
|
input: The xml content. |
|
|
""" |
|
|
self.max_nbr_messages = 2 |
|
|
self.nbr_messages = 0 |
|
|
self.empty_text = "" |
|
|
self._soup = BeautifulSoup(input, features="xml") |
|
|
|
|
|
def _create_tg_range(self, tgs: list[dict[str, Any]]) -> dict[int, ColInfoType]: |
|
|
"""Create a unified range along the table groups. |
|
|
|
|
|
Args: |
|
|
tgs: Table group column specifications. |
|
|
|
|
|
Returns: |
|
|
Unified group column specifications. |
|
|
""" |
|
|
colinfo: dict[int, XmlTable.ColInfoType] = {} |
|
|
|
|
|
if len(tgs) == 0: |
|
|
return colinfo |
|
|
|
|
|
for itg, tg in enumerate(tgs): |
|
|
colinfo[itg] = { |
|
|
"offset": [], |
|
|
"colwidth": [], |
|
|
"cell_range": [], |
|
|
"cell_offst": [0], |
|
|
} |
|
|
offst = 0 |
|
|
for info in tg["colinfo"]: |
|
|
cw = info["colwidth"] |
|
|
cw = re.sub("pt", "", cw, flags=re.I) |
|
|
cw = re.sub("mm", "", cw, flags=re.I) |
|
|
try: |
|
|
cw = int(cw) |
|
|
except BaseException: |
|
|
cw = float(cw) |
|
|
colinfo[itg]["colwidth"].append(cw) |
|
|
colinfo[itg]["offset"].append(offst) |
|
|
offst += cw |
|
|
colinfo[itg]["offset"].append(offst) |
|
|
|
|
|
min_colinfo: XmlTable.MinColInfoType = {"offset": [], "colwidth": []} |
|
|
|
|
|
min_colinfo["offset"] = colinfo[0]["offset"] |
|
|
offset_w0 = [] |
|
|
for itg, col in colinfo.items(): |
|
|
|
|
|
for ic, cw in enumerate(col["colwidth"]): |
|
|
if cw == 0: |
|
|
offset_w0.append(col["offset"][ic]) |
|
|
|
|
|
min_colinfo["offset"] = sorted( |
|
|
list(set(col["offset"] + min_colinfo["offset"])) |
|
|
) |
|
|
|
|
|
|
|
|
offset_w0 = list(set(offset_w0)) |
|
|
min_colinfo["offset"] = sorted(min_colinfo["offset"] + offset_w0) |
|
|
|
|
|
for i in range(len(min_colinfo["offset"]) - 1): |
|
|
min_colinfo["colwidth"].append( |
|
|
min_colinfo["offset"][i + 1] - min_colinfo["offset"][i] |
|
|
) |
|
|
|
|
|
for itg, col in colinfo.items(): |
|
|
i = 1 |
|
|
range_ = 1 |
|
|
for min_i in range(1, len(min_colinfo["offset"])): |
|
|
min_offst = min_colinfo["offset"][min_i] |
|
|
offst = col["offset"][i] |
|
|
if min_offst == offst: |
|
|
if ( |
|
|
len(col["offset"]) == i + 1 |
|
|
and len(min_colinfo["offset"]) > min_i + 1 |
|
|
): |
|
|
range_ += 1 |
|
|
else: |
|
|
col["cell_range"].append(range_) |
|
|
col["cell_offst"].append(col["cell_offst"][-1] + range_) |
|
|
range_ = 1 |
|
|
i += 1 |
|
|
elif min_offst < offst: |
|
|
range_ += 1 |
|
|
else: |
|
|
_log.debug("A USPTO XML table has wrong offsets.") |
|
|
return {} |
|
|
|
|
|
return colinfo |
|
|
|
|
|
def _get_max_ncols(self, tgs_info: dict[int, ColInfoType]) -> NonNegativeInt: |
|
|
"""Get the maximum number of columns across table groups. |
|
|
|
|
|
Args: |
|
|
tgs_info: Unified group column specifications. |
|
|
|
|
|
Return: |
|
|
The maximum number of columns. |
|
|
""" |
|
|
ncols_max = 0 |
|
|
for rowinfo in tgs_info.values(): |
|
|
ncols_max = max(ncols_max, len(rowinfo["colwidth"])) |
|
|
|
|
|
return ncols_max |
|
|
|
|
|
def _parse_table(self, table: Tag) -> TableData: |
|
|
"""Parse the content of a table tag. |
|
|
|
|
|
Args: |
|
|
The table element. |
|
|
|
|
|
Returns: |
|
|
A docling table object. |
|
|
""" |
|
|
tgs_align = [] |
|
|
tg_secs = table.find_all("tgroup") |
|
|
if tg_secs: |
|
|
for tg_sec in tg_secs: |
|
|
ncols = tg_sec.get("cols", None) |
|
|
if ncols: |
|
|
ncols = int(ncols) |
|
|
tg_align = {"ncols": ncols, "colinfo": []} |
|
|
cs_secs = tg_sec.find_all("colspec") |
|
|
if cs_secs: |
|
|
for cs_sec in cs_secs: |
|
|
colname = cs_sec.get("colname", None) |
|
|
colwidth = cs_sec.get("colwidth", None) |
|
|
tg_align["colinfo"].append( |
|
|
{"colname": colname, "colwidth": colwidth} |
|
|
) |
|
|
|
|
|
tgs_align.append(tg_align) |
|
|
|
|
|
|
|
|
tgs_range = self._create_tg_range(tgs_align) |
|
|
|
|
|
|
|
|
if not tgs_range: |
|
|
dl_table = TableData(num_rows=0, num_cols=0, table_cells=[]) |
|
|
return dl_table |
|
|
|
|
|
ncols_max = self._get_max_ncols(tgs_range) |
|
|
|
|
|
|
|
|
table_data: list[TableCell] = [] |
|
|
i_row_global = 0 |
|
|
is_row_empty: bool = True |
|
|
tg_secs = table.find_all("tgroup") |
|
|
if tg_secs: |
|
|
for itg, tg_sec in enumerate(tg_secs): |
|
|
tg_range = tgs_range[itg] |
|
|
row_secs = tg_sec.find_all(["row", "tr"]) |
|
|
|
|
|
if row_secs: |
|
|
for row_sec in row_secs: |
|
|
entry_secs = row_sec.find_all(["entry", "td"]) |
|
|
is_header: bool = row_sec.parent.name in ["thead"] |
|
|
|
|
|
ncols = 0 |
|
|
local_row: list[TableCell] = [] |
|
|
is_row_empty = True |
|
|
if entry_secs: |
|
|
wrong_nbr_cols = False |
|
|
for ientry, entry_sec in enumerate(entry_secs): |
|
|
text = entry_sec.get_text().strip() |
|
|
|
|
|
|
|
|
namest = entry_sec.attrs.get("namest", None) |
|
|
nameend = entry_sec.attrs.get("nameend", None) |
|
|
if isinstance(namest, str) and namest.isnumeric(): |
|
|
namest = int(namest) |
|
|
else: |
|
|
namest = ientry + 1 |
|
|
if isinstance(nameend, str) and nameend.isnumeric(): |
|
|
nameend = int(nameend) |
|
|
shift = 0 |
|
|
else: |
|
|
nameend = ientry + 2 |
|
|
shift = 1 |
|
|
|
|
|
if nameend > len(tg_range["cell_offst"]): |
|
|
wrong_nbr_cols = True |
|
|
self.nbr_messages += 1 |
|
|
if self.nbr_messages <= self.max_nbr_messages: |
|
|
_log.debug( |
|
|
"USPTO table has # entries != # columns" |
|
|
) |
|
|
break |
|
|
|
|
|
range_ = [ |
|
|
tg_range["cell_offst"][namest - 1], |
|
|
tg_range["cell_offst"][nameend - 1] - shift, |
|
|
] |
|
|
|
|
|
|
|
|
cell_text = text if text else self.empty_text |
|
|
if cell_text != self.empty_text: |
|
|
is_row_empty = False |
|
|
for irep in range(range_[0], range_[1] + 1): |
|
|
ncols += 1 |
|
|
local_row.append( |
|
|
TableCell( |
|
|
column_header=is_header, |
|
|
text=cell_text, |
|
|
start_row_offset_idx=i_row_global, |
|
|
end_row_offset_idx=i_row_global + 1, |
|
|
row_span=1, |
|
|
start_col_offset_idx=range_[0], |
|
|
end_col_offset_idx=range_[1] + 1, |
|
|
col_span=range_[1] - range_[0] + 1, |
|
|
) |
|
|
) |
|
|
|
|
|
if wrong_nbr_cols: |
|
|
|
|
|
local_row = [] |
|
|
ncols = 0 |
|
|
|
|
|
|
|
|
for irep in range(ncols, ncols_max): |
|
|
local_row.append( |
|
|
TableCell( |
|
|
column_header=is_header, |
|
|
text=self.empty_text, |
|
|
start_row_offset_idx=i_row_global, |
|
|
end_row_offset_idx=i_row_global + 1, |
|
|
row_span=1, |
|
|
start_col_offset_idx=irep, |
|
|
end_col_offset_idx=irep + 1, |
|
|
col_span=1, |
|
|
) |
|
|
) |
|
|
|
|
|
if not is_row_empty: |
|
|
table_data.extend(local_row) |
|
|
i_row_global += 1 |
|
|
|
|
|
dl_table = TableData( |
|
|
num_rows=i_row_global, num_cols=ncols_max, table_cells=table_data |
|
|
) |
|
|
|
|
|
return dl_table |
|
|
|
|
|
def parse(self) -> Optional[TableData]: |
|
|
"""Parse the first table from an xml content. |
|
|
|
|
|
Returns: |
|
|
A docling table data. |
|
|
""" |
|
|
section = self._soup.find("table") |
|
|
if section is not None: |
|
|
table = self._parse_table(section) |
|
|
if table.num_rows == 0 or table.num_cols == 0: |
|
|
_log.warning("The parsed USPTO table is empty") |
|
|
return table |
|
|
else: |
|
|
return None |
|
|
|
|
|
|
|
|
class HtmlEntity: |
|
|
"""Provide utility functions to get the HTML entities of styled characters. |
|
|
|
|
|
This class has been developped from: |
|
|
https://unicode-table.com/en/html-entities/ |
|
|
https://www.w3.org/TR/WD-math-970515/table03.html |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize this class by loading the HTML entity dictionaries.""" |
|
|
self.superscript = str.maketrans( |
|
|
{ |
|
|
"1": "¹", |
|
|
"2": "²", |
|
|
"3": "³", |
|
|
"4": "⁴", |
|
|
"5": "⁵", |
|
|
"6": "⁶", |
|
|
"7": "⁷", |
|
|
"8": "⁸", |
|
|
"9": "⁹", |
|
|
"0": "⁰", |
|
|
"+": "⁺", |
|
|
"-": "⁻", |
|
|
"−": "⁻", |
|
|
"=": "⁼", |
|
|
"(": "⁽", |
|
|
")": "⁾", |
|
|
"a": "ª", |
|
|
"o": "º", |
|
|
"i": "ⁱ", |
|
|
"n": "ⁿ", |
|
|
} |
|
|
) |
|
|
self.subscript = str.maketrans( |
|
|
{ |
|
|
"1": "₁", |
|
|
"2": "₂", |
|
|
"3": "₃", |
|
|
"4": "₄", |
|
|
"5": "₅", |
|
|
"6": "₆", |
|
|
"7": "₇", |
|
|
"8": "₈", |
|
|
"9": "₉", |
|
|
"0": "₀", |
|
|
"+": "₊", |
|
|
"-": "₋", |
|
|
"−": "₋", |
|
|
"=": "₌", |
|
|
"(": "₍", |
|
|
")": "₎", |
|
|
"a": "ₐ", |
|
|
"e": "ₑ", |
|
|
"o": "ₒ", |
|
|
"x": "ₓ", |
|
|
} |
|
|
) |
|
|
self.mathematical_italic = str.maketrans( |
|
|
{ |
|
|
"A": "𝐴", |
|
|
"B": "𝐵", |
|
|
"C": "𝐶", |
|
|
"D": "𝐷", |
|
|
"E": "𝐸", |
|
|
"F": "𝐹", |
|
|
"G": "𝐺", |
|
|
"H": "𝐻", |
|
|
"I": "𝐼", |
|
|
"J": "𝐽", |
|
|
"K": "𝐾", |
|
|
"L": "𝐿", |
|
|
"M": "𝑀", |
|
|
"N": "𝑁", |
|
|
"O": "𝑂", |
|
|
"P": "𝑃", |
|
|
"Q": "𝑄", |
|
|
"R": "𝑅", |
|
|
"S": "𝑆", |
|
|
"T": "𝑇", |
|
|
"U": "𝑈", |
|
|
"V": "𝑉", |
|
|
"W": "𝑊", |
|
|
"Y": "𝑌", |
|
|
"Z": "𝑍", |
|
|
"a": "𝑎", |
|
|
"b": "𝑏", |
|
|
"c": "𝑐", |
|
|
"d": "𝑑", |
|
|
"e": "𝑒", |
|
|
"f": "𝑓", |
|
|
"g": "𝑔", |
|
|
"h": "𝑕", |
|
|
"i": "𝑖", |
|
|
"j": "𝑗", |
|
|
"k": "𝑘", |
|
|
"l": "𝑙", |
|
|
"m": "𝑚", |
|
|
"n": "𝑛", |
|
|
"o": "𝑜", |
|
|
"p": "𝑝", |
|
|
"q": "𝑞", |
|
|
"r": "𝑟", |
|
|
"s": "𝑠", |
|
|
"t": "𝑡", |
|
|
"u": "𝑢", |
|
|
"v": "𝑣", |
|
|
"w": "𝑤", |
|
|
"x": "𝑥", |
|
|
"y": "𝑦", |
|
|
"z": "𝑧", |
|
|
} |
|
|
) |
|
|
|
|
|
self.lookup_iso8879 = { |
|
|
"&Agr;": "Α", |
|
|
"&Bgr;": "Β", |
|
|
"&Ggr;": "Γ", |
|
|
"&Dgr;": "Δ", |
|
|
"&Egr;": "Ε", |
|
|
"&Zgr;": "Ζ", |
|
|
"&EEgr;": "Η", |
|
|
"&THgr;": "Θ", |
|
|
"&Igr;": "Ι", |
|
|
"&Kgr;": "Κ", |
|
|
"&Lgr;": "Λ", |
|
|
"&Mgr;": "Μ", |
|
|
"&Ngr;": "Ν", |
|
|
"&Xgr;": "Ξ", |
|
|
"&Ogr;": "Ο", |
|
|
"&Pgr;": "Π", |
|
|
"&Rgr;": "Ρ", |
|
|
"&Sgr;": "Σ", |
|
|
"&Tgr;": "Τ", |
|
|
"&Ugr;": "Υ", |
|
|
"&PHgr;": "Φ", |
|
|
"&KHgr;": "Χ", |
|
|
"&PSgr;": "Ψ", |
|
|
"&OHgr;": "Ω", |
|
|
"&agr;": "α", |
|
|
"&bgr;": "β", |
|
|
"&ggr;": "γ", |
|
|
"&dgr;": "δ", |
|
|
"&egr;": "ε", |
|
|
"&zgr;": "ζ", |
|
|
"&eegr;": "η", |
|
|
"&thgr;": "θ", |
|
|
"&igr;": "ι", |
|
|
"&kgr;": "κ", |
|
|
"&lgr;": "λ", |
|
|
"&mgr;": "μ", |
|
|
"&ngr;": "ν", |
|
|
"&xgr;": "ξ", |
|
|
"&ogr;": "ο", |
|
|
"&pgr;": "π", |
|
|
"&rgr;": "ρ", |
|
|
"&sgr;": "ς", |
|
|
"&tgr;": "τ", |
|
|
"&ugr;": "υ", |
|
|
"&phgr;": "φ", |
|
|
"&khgr;": "χ", |
|
|
"&psgr;": "ψ", |
|
|
"&ohgr;": "ω", |
|
|
} |
|
|
|
|
|
def get_superscript(self, text: str) -> str: |
|
|
"""Get a text in superscript as HTML entities. |
|
|
|
|
|
Args: |
|
|
text: The text to transform. |
|
|
|
|
|
Returns: |
|
|
The text in superscript as HTML entities. |
|
|
""" |
|
|
return text.translate(self.superscript) |
|
|
|
|
|
def get_subscript(self, text: str) -> str: |
|
|
"""Get a text in subscript as HTML entities. |
|
|
|
|
|
Args: |
|
|
The text to transform. |
|
|
|
|
|
Returns: |
|
|
The text in subscript as HTML entities. |
|
|
""" |
|
|
return text.translate(self.subscript) |
|
|
|
|
|
def get_math_italic(self, text: str) -> str: |
|
|
"""Get a text in italic as HTML entities. |
|
|
|
|
|
Args: |
|
|
The text to transform. |
|
|
|
|
|
Returns: |
|
|
The text in italics as HTML entities. |
|
|
""" |
|
|
return text.translate(self.mathematical_italic) |
|
|
|
|
|
def get_greek_from_iso8879(self, text: str) -> str: |
|
|
"""Get an HTML entity of a greek letter in ISO 8879. |
|
|
|
|
|
Args: |
|
|
The text to transform, as an ISO 8879 entitiy. |
|
|
|
|
|
Returns: |
|
|
The HTML entity representing a greek letter. If the input text is not |
|
|
supported, the original text is returned. |
|
|
""" |
|
|
return self.lookup_iso8879.get(text, text) |
|
|
|