|
|
import logging |
|
|
from io import BytesIO |
|
|
from pathlib import Path |
|
|
from typing import Any, Set, Union |
|
|
|
|
|
import lxml |
|
|
from bs4 import BeautifulSoup |
|
|
from docling_core.types.doc import ( |
|
|
DocItemLabel, |
|
|
DoclingDocument, |
|
|
DocumentOrigin, |
|
|
GroupLabel, |
|
|
TableCell, |
|
|
TableData, |
|
|
) |
|
|
from lxml import etree |
|
|
from typing_extensions import TypedDict, override |
|
|
|
|
|
from docling.backend.abstract_backend import DeclarativeDocumentBackend |
|
|
from docling.datamodel.base_models import InputFormat |
|
|
from docling.datamodel.document import InputDocument |
|
|
|
|
|
_log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class Paragraph(TypedDict): |
|
|
text: str |
|
|
headers: list[str] |
|
|
|
|
|
|
|
|
class Author(TypedDict): |
|
|
name: str |
|
|
affiliation_names: list[str] |
|
|
|
|
|
|
|
|
class Table(TypedDict): |
|
|
label: str |
|
|
caption: str |
|
|
content: str |
|
|
|
|
|
|
|
|
class FigureCaption(TypedDict): |
|
|
label: str |
|
|
caption: str |
|
|
|
|
|
|
|
|
class Reference(TypedDict): |
|
|
author_names: str |
|
|
title: str |
|
|
journal: str |
|
|
year: str |
|
|
|
|
|
|
|
|
class XMLComponents(TypedDict): |
|
|
title: str |
|
|
authors: list[Author] |
|
|
abstract: str |
|
|
paragraphs: list[Paragraph] |
|
|
tables: list[Table] |
|
|
figure_captions: list[FigureCaption] |
|
|
references: list[Reference] |
|
|
|
|
|
|
|
|
class PubMedDocumentBackend(DeclarativeDocumentBackend): |
|
|
""" |
|
|
The code from this document backend has been developed by modifying parts of the PubMed Parser library (version 0.5.0, released on 12.08.2024): |
|
|
Achakulvisut et al., (2020). |
|
|
Pubmed Parser: A Python Parser for PubMed Open-Access XML Subset and MEDLINE XML Dataset XML Dataset. |
|
|
Journal of Open Source Software, 5(46), 1979, |
|
|
https://doi.org/10.21105/joss.01979 |
|
|
""" |
|
|
|
|
|
@override |
|
|
def __init__(self, in_doc: "InputDocument", path_or_stream: Union[BytesIO, Path]): |
|
|
super().__init__(in_doc, path_or_stream) |
|
|
self.path_or_stream = path_or_stream |
|
|
|
|
|
|
|
|
self.parents: dict = {} |
|
|
|
|
|
self.valid = False |
|
|
try: |
|
|
if isinstance(self.path_or_stream, BytesIO): |
|
|
self.path_or_stream.seek(0) |
|
|
self.tree: lxml.etree._ElementTree = etree.parse(self.path_or_stream) |
|
|
if "/NLM//DTD JATS" in self.tree.docinfo.public_id: |
|
|
self.valid = True |
|
|
except Exception as exc: |
|
|
raise RuntimeError( |
|
|
f"Could not initialize PubMed backend for file with hash {self.document_hash}." |
|
|
) from exc |
|
|
|
|
|
@override |
|
|
def is_valid(self) -> bool: |
|
|
return self.valid |
|
|
|
|
|
@classmethod |
|
|
@override |
|
|
def supports_pagination(cls) -> bool: |
|
|
return False |
|
|
|
|
|
@override |
|
|
def unload(self): |
|
|
if isinstance(self.path_or_stream, BytesIO): |
|
|
self.path_or_stream.close() |
|
|
self.path_or_stream = None |
|
|
|
|
|
@classmethod |
|
|
@override |
|
|
def supported_formats(cls) -> Set[InputFormat]: |
|
|
return {InputFormat.XML_PUBMED} |
|
|
|
|
|
@override |
|
|
def convert(self) -> DoclingDocument: |
|
|
|
|
|
origin = DocumentOrigin( |
|
|
filename=self.file.name or "file", |
|
|
mimetype="application/xml", |
|
|
binary_hash=self.document_hash, |
|
|
) |
|
|
doc = DoclingDocument(name=self.file.stem or "file", origin=origin) |
|
|
|
|
|
_log.debug("Trying to convert PubMed XML document...") |
|
|
|
|
|
|
|
|
xml_components: XMLComponents = self._parse() |
|
|
|
|
|
|
|
|
doc = self._populate_document(doc, xml_components) |
|
|
return doc |
|
|
|
|
|
def _parse_title(self) -> str: |
|
|
title: str = " ".join( |
|
|
[ |
|
|
t.replace("\n", "") |
|
|
for t in self.tree.xpath(".//title-group/article-title")[0].itertext() |
|
|
] |
|
|
) |
|
|
return title |
|
|
|
|
|
def _parse_authors(self) -> list[Author]: |
|
|
|
|
|
affiliation_names = [] |
|
|
for affiliation_node in self.tree.xpath(".//aff[@id]"): |
|
|
affiliation_names.append( |
|
|
": ".join([t for t in affiliation_node.itertext() if t != "\n"]) |
|
|
) |
|
|
affiliation_ids_names = { |
|
|
id: name |
|
|
for id, name in zip(self.tree.xpath(".//aff[@id]/@id"), affiliation_names) |
|
|
} |
|
|
|
|
|
|
|
|
authors: list[Author] = [] |
|
|
for author_node in self.tree.xpath( |
|
|
'.//contrib-group/contrib[@contrib-type="author"]' |
|
|
): |
|
|
author: Author = { |
|
|
"name": "", |
|
|
"affiliation_names": [], |
|
|
} |
|
|
|
|
|
|
|
|
affiliation_ids = [ |
|
|
a.attrib["rid"] for a in author_node.xpath('xref[@ref-type="aff"]') |
|
|
] |
|
|
for id in affiliation_ids: |
|
|
if id in affiliation_ids_names: |
|
|
author["affiliation_names"].append(affiliation_ids_names[id]) |
|
|
|
|
|
|
|
|
author["name"] = ( |
|
|
author_node.xpath("name/surname")[0].text |
|
|
+ " " |
|
|
+ author_node.xpath("name/given-names")[0].text |
|
|
) |
|
|
|
|
|
authors.append(author) |
|
|
return authors |
|
|
|
|
|
def _parse_abstract(self) -> str: |
|
|
texts = [] |
|
|
for abstract_node in self.tree.xpath(".//abstract"): |
|
|
for text in abstract_node.itertext(): |
|
|
texts.append(text.replace("\n", "")) |
|
|
abstract: str = "".join(texts) |
|
|
return abstract |
|
|
|
|
|
def _parse_main_text(self) -> list[Paragraph]: |
|
|
paragraphs: list[Paragraph] = [] |
|
|
for paragraph_node in self.tree.xpath("//body//p"): |
|
|
|
|
|
if "/caption" in paragraph_node.getroottree().getpath(paragraph_node): |
|
|
continue |
|
|
|
|
|
paragraph: Paragraph = {"text": "", "headers": []} |
|
|
|
|
|
|
|
|
paragraph["text"] = "".join( |
|
|
[t.replace("\n", "") for t in paragraph_node.itertext()] |
|
|
) |
|
|
|
|
|
|
|
|
path = "../title" |
|
|
while len(paragraph_node.xpath(path)) > 0: |
|
|
paragraph["headers"].append( |
|
|
"".join( |
|
|
[ |
|
|
t.replace("\n", "") |
|
|
for t in paragraph_node.xpath(path)[0].itertext() |
|
|
] |
|
|
) |
|
|
) |
|
|
path = "../" + path |
|
|
|
|
|
paragraphs.append(paragraph) |
|
|
|
|
|
return paragraphs |
|
|
|
|
|
def _parse_tables(self) -> list[Table]: |
|
|
tables: list[Table] = [] |
|
|
for table_node in self.tree.xpath(".//body//table-wrap"): |
|
|
table: Table = {"label": "", "caption": "", "content": ""} |
|
|
|
|
|
|
|
|
if len(table_node.xpath("table")) > 0: |
|
|
table_content_node = table_node.xpath("table")[0] |
|
|
elif len(table_node.xpath("alternatives/table")) > 0: |
|
|
table_content_node = table_node.xpath("alternatives/table")[0] |
|
|
else: |
|
|
table_content_node = None |
|
|
if table_content_node != None: |
|
|
table["content"] = etree.tostring(table_content_node).decode("utf-8") |
|
|
|
|
|
|
|
|
if len(table_node.xpath("caption/p")) > 0: |
|
|
caption_node = table_node.xpath("caption/p")[0] |
|
|
elif len(table_node.xpath("caption/title")) > 0: |
|
|
caption_node = table_node.xpath("caption/title")[0] |
|
|
else: |
|
|
caption_node = None |
|
|
if caption_node != None: |
|
|
table["caption"] = "".join( |
|
|
[t.replace("\n", "") for t in caption_node.itertext()] |
|
|
) |
|
|
|
|
|
|
|
|
if len(table_node.xpath("label")) > 0: |
|
|
table["label"] = table_node.xpath("label")[0].text |
|
|
|
|
|
tables.append(table) |
|
|
return tables |
|
|
|
|
|
def _parse_figure_captions(self) -> list[FigureCaption]: |
|
|
figure_captions: list[FigureCaption] = [] |
|
|
|
|
|
if not (self.tree.xpath(".//fig")): |
|
|
return figure_captions |
|
|
|
|
|
for figure_node in self.tree.xpath(".//fig"): |
|
|
figure_caption: FigureCaption = { |
|
|
"caption": "", |
|
|
"label": "", |
|
|
} |
|
|
|
|
|
|
|
|
if figure_node.xpath("label"): |
|
|
figure_caption["label"] = "".join( |
|
|
[ |
|
|
t.replace("\n", "") |
|
|
for t in figure_node.xpath("label")[0].itertext() |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
if figure_node.xpath("caption"): |
|
|
caption = "" |
|
|
for caption_node in figure_node.xpath("caption")[0].getchildren(): |
|
|
caption += ( |
|
|
"".join([t.replace("\n", "") for t in caption_node.itertext()]) |
|
|
+ "\n" |
|
|
) |
|
|
figure_caption["caption"] = caption |
|
|
|
|
|
figure_captions.append(figure_caption) |
|
|
|
|
|
return figure_captions |
|
|
|
|
|
def _parse_references(self) -> list[Reference]: |
|
|
references: list[Reference] = [] |
|
|
for reference_node_abs in self.tree.xpath(".//ref-list/ref"): |
|
|
reference: Reference = { |
|
|
"author_names": "", |
|
|
"title": "", |
|
|
"journal": "", |
|
|
"year": "", |
|
|
} |
|
|
reference_node: Any = None |
|
|
for tag in ["mixed-citation", "element-citation", "citation"]: |
|
|
if len(reference_node_abs.xpath(tag)) > 0: |
|
|
reference_node = reference_node_abs.xpath(tag)[0] |
|
|
break |
|
|
|
|
|
if reference_node is None: |
|
|
continue |
|
|
|
|
|
if all( |
|
|
not (ref_type in ["citation-type", "publication-type"]) |
|
|
for ref_type in reference_node.attrib.keys() |
|
|
): |
|
|
continue |
|
|
|
|
|
|
|
|
names = [] |
|
|
if len(reference_node.xpath("name")) > 0: |
|
|
for name_node in reference_node.xpath("name"): |
|
|
name_str = " ".join( |
|
|
[t.text for t in name_node.getchildren() if (t.text != None)] |
|
|
) |
|
|
names.append(name_str) |
|
|
elif len(reference_node.xpath("person-group")) > 0: |
|
|
for name_node in reference_node.xpath("person-group")[0]: |
|
|
name_str = ( |
|
|
name_node.xpath("given-names")[0].text |
|
|
+ " " |
|
|
+ name_node.xpath("surname")[0].text |
|
|
) |
|
|
names.append(name_str) |
|
|
reference["author_names"] = "; ".join(names) |
|
|
|
|
|
|
|
|
if len(reference_node.xpath("article-title")) > 0: |
|
|
reference["title"] = " ".join( |
|
|
[ |
|
|
t.replace("\n", " ") |
|
|
for t in reference_node.xpath("article-title")[0].itertext() |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
if len(reference_node.xpath("source")) > 0: |
|
|
reference["journal"] = reference_node.xpath("source")[0].text |
|
|
|
|
|
|
|
|
if len(reference_node.xpath("year")) > 0: |
|
|
reference["year"] = reference_node.xpath("year")[0].text |
|
|
|
|
|
if ( |
|
|
not (reference_node.xpath("article-title")) |
|
|
and not (reference_node.xpath("journal")) |
|
|
and not (reference_node.xpath("year")) |
|
|
): |
|
|
reference["title"] = reference_node.text |
|
|
|
|
|
references.append(reference) |
|
|
return references |
|
|
|
|
|
def _parse(self) -> XMLComponents: |
|
|
"""Parsing PubMed document.""" |
|
|
xml_components: XMLComponents = { |
|
|
"title": self._parse_title(), |
|
|
"authors": self._parse_authors(), |
|
|
"abstract": self._parse_abstract(), |
|
|
"paragraphs": self._parse_main_text(), |
|
|
"tables": self._parse_tables(), |
|
|
"figure_captions": self._parse_figure_captions(), |
|
|
"references": self._parse_references(), |
|
|
} |
|
|
return xml_components |
|
|
|
|
|
def _populate_document( |
|
|
self, doc: DoclingDocument, xml_components: XMLComponents |
|
|
) -> DoclingDocument: |
|
|
self._add_title(doc, xml_components) |
|
|
self._add_authors(doc, xml_components) |
|
|
self._add_abstract(doc, xml_components) |
|
|
self._add_main_text(doc, xml_components) |
|
|
|
|
|
if xml_components["tables"]: |
|
|
self._add_tables(doc, xml_components) |
|
|
|
|
|
if xml_components["figure_captions"]: |
|
|
self._add_figure_captions(doc, xml_components) |
|
|
|
|
|
self._add_references(doc, xml_components) |
|
|
return doc |
|
|
|
|
|
def _add_figure_captions( |
|
|
self, doc: DoclingDocument, xml_components: XMLComponents |
|
|
) -> None: |
|
|
self.parents["Figures"] = doc.add_heading( |
|
|
parent=self.parents["Title"], text="Figures" |
|
|
) |
|
|
for figure_caption_xml_component in xml_components["figure_captions"]: |
|
|
figure_caption_text = ( |
|
|
figure_caption_xml_component["label"] |
|
|
+ ": " |
|
|
+ figure_caption_xml_component["caption"].strip() |
|
|
) |
|
|
fig_caption = doc.add_text( |
|
|
label=DocItemLabel.CAPTION, text=figure_caption_text |
|
|
) |
|
|
doc.add_picture( |
|
|
parent=self.parents["Figures"], |
|
|
caption=fig_caption, |
|
|
) |
|
|
return |
|
|
|
|
|
def _add_title(self, doc: DoclingDocument, xml_components: XMLComponents) -> None: |
|
|
self.parents["Title"] = doc.add_text( |
|
|
parent=None, |
|
|
text=xml_components["title"], |
|
|
label=DocItemLabel.TITLE, |
|
|
) |
|
|
return |
|
|
|
|
|
def _add_authors(self, doc: DoclingDocument, xml_components: XMLComponents) -> None: |
|
|
authors_affiliations: list = [] |
|
|
for author in xml_components["authors"]: |
|
|
authors_affiliations.append(author["name"]) |
|
|
authors_affiliations.append(", ".join(author["affiliation_names"])) |
|
|
authors_affiliations_str = "; ".join(authors_affiliations) |
|
|
|
|
|
doc.add_text( |
|
|
parent=self.parents["Title"], |
|
|
text=authors_affiliations_str, |
|
|
label=DocItemLabel.PARAGRAPH, |
|
|
) |
|
|
return |
|
|
|
|
|
def _add_abstract( |
|
|
self, doc: DoclingDocument, xml_components: XMLComponents |
|
|
) -> None: |
|
|
abstract_text: str = xml_components["abstract"] |
|
|
self.parents["Abstract"] = doc.add_heading( |
|
|
parent=self.parents["Title"], text="Abstract" |
|
|
) |
|
|
doc.add_text( |
|
|
parent=self.parents["Abstract"], |
|
|
text=abstract_text, |
|
|
label=DocItemLabel.TEXT, |
|
|
) |
|
|
return |
|
|
|
|
|
def _add_main_text( |
|
|
self, doc: DoclingDocument, xml_components: XMLComponents |
|
|
) -> None: |
|
|
added_headers: list = [] |
|
|
for paragraph in xml_components["paragraphs"]: |
|
|
if not (paragraph["headers"]): |
|
|
continue |
|
|
|
|
|
|
|
|
for i, header in enumerate(reversed(paragraph["headers"])): |
|
|
if header in added_headers: |
|
|
continue |
|
|
added_headers.append(header) |
|
|
|
|
|
if ((i - 1) >= 0) and list(reversed(paragraph["headers"]))[ |
|
|
i - 1 |
|
|
] in self.parents: |
|
|
parent = self.parents[list(reversed(paragraph["headers"]))[i - 1]] |
|
|
else: |
|
|
parent = self.parents["Title"] |
|
|
|
|
|
self.parents[header] = doc.add_heading(parent=parent, text=header) |
|
|
|
|
|
|
|
|
if paragraph["headers"][0] in self.parents: |
|
|
parent = self.parents[paragraph["headers"][0]] |
|
|
else: |
|
|
parent = self.parents["Title"] |
|
|
|
|
|
doc.add_text(parent=parent, label=DocItemLabel.TEXT, text=paragraph["text"]) |
|
|
return |
|
|
|
|
|
def _add_references( |
|
|
self, doc: DoclingDocument, xml_components: XMLComponents |
|
|
) -> None: |
|
|
self.parents["References"] = doc.add_heading( |
|
|
parent=self.parents["Title"], text="References" |
|
|
) |
|
|
current_list = doc.add_group( |
|
|
parent=self.parents["References"], label=GroupLabel.LIST, name="list" |
|
|
) |
|
|
for reference in xml_components["references"]: |
|
|
reference_text: str = "" |
|
|
if reference["author_names"]: |
|
|
reference_text += reference["author_names"] + ". " |
|
|
|
|
|
if reference["title"]: |
|
|
reference_text += reference["title"] |
|
|
if reference["title"][-1] != ".": |
|
|
reference_text += "." |
|
|
reference_text += " " |
|
|
|
|
|
if reference["journal"]: |
|
|
reference_text += reference["journal"] |
|
|
|
|
|
if reference["year"]: |
|
|
reference_text += " (" + reference["year"] + ")" |
|
|
|
|
|
if not (reference_text): |
|
|
_log.debug(f"Skipping reference for: {str(self.file)}") |
|
|
continue |
|
|
|
|
|
doc.add_list_item( |
|
|
text=reference_text, enumerated=False, parent=current_list |
|
|
) |
|
|
return |
|
|
|
|
|
def _add_tables(self, doc: DoclingDocument, xml_components: XMLComponents) -> None: |
|
|
self.parents["Tables"] = doc.add_heading( |
|
|
parent=self.parents["Title"], text="Tables" |
|
|
) |
|
|
for table_xml_component in xml_components["tables"]: |
|
|
try: |
|
|
self._add_table(doc, table_xml_component) |
|
|
except Exception as e: |
|
|
_log.debug(f"Skipping unsupported table for: {str(self.file)}") |
|
|
pass |
|
|
return |
|
|
|
|
|
def _add_table(self, doc: DoclingDocument, table_xml_component: Table) -> None: |
|
|
soup = BeautifulSoup(table_xml_component["content"], "html.parser") |
|
|
table_tag = soup.find("table") |
|
|
|
|
|
nested_tables = table_tag.find("table") |
|
|
if nested_tables: |
|
|
_log.debug(f"Skipping nested table for: {str(self.file)}") |
|
|
return |
|
|
|
|
|
|
|
|
num_rows = len(table_tag.find_all("tr")) |
|
|
|
|
|
|
|
|
num_cols = 0 |
|
|
for row in table_tag.find_all("tr"): |
|
|
col_count = 0 |
|
|
for cell in row.find_all(["td", "th"]): |
|
|
colspan = int(cell.get("colspan", 1)) |
|
|
col_count += colspan |
|
|
num_cols = max(num_cols, col_count) |
|
|
|
|
|
grid = [[None for _ in range(num_cols)] for _ in range(num_rows)] |
|
|
|
|
|
data = TableData(num_rows=num_rows, num_cols=num_cols, table_cells=[]) |
|
|
|
|
|
|
|
|
for row_idx, row in enumerate(table_tag.find_all("tr")): |
|
|
|
|
|
cells = row.find_all(["td", "th"]) |
|
|
|
|
|
|
|
|
col_header = True |
|
|
for j, html_cell in enumerate(cells): |
|
|
if html_cell.name == "td": |
|
|
col_header = False |
|
|
|
|
|
|
|
|
col_idx = 0 |
|
|
for _, html_cell in enumerate(cells): |
|
|
text = html_cell.text |
|
|
|
|
|
col_span = int(html_cell.get("colspan", 1)) |
|
|
row_span = int(html_cell.get("rowspan", 1)) |
|
|
|
|
|
while grid[row_idx][col_idx] != None: |
|
|
col_idx += 1 |
|
|
for r in range(row_span): |
|
|
for c in range(col_span): |
|
|
grid[row_idx + r][col_idx + c] = text |
|
|
|
|
|
cell = TableCell( |
|
|
text=text, |
|
|
row_span=row_span, |
|
|
col_span=col_span, |
|
|
start_row_offset_idx=row_idx, |
|
|
end_row_offset_idx=row_idx + row_span, |
|
|
start_col_offset_idx=col_idx, |
|
|
end_col_offset_idx=col_idx + col_span, |
|
|
col_header=col_header, |
|
|
row_header=((not col_header) and html_cell.name == "th"), |
|
|
) |
|
|
data.table_cells.append(cell) |
|
|
|
|
|
table_caption = doc.add_text( |
|
|
label=DocItemLabel.CAPTION, |
|
|
text=table_xml_component["label"] + ": " + table_xml_component["caption"], |
|
|
) |
|
|
doc.add_table(data=data, parent=self.parents["Tables"], caption=table_caption) |
|
|
return |
|
|
|