Spaces:
Runtime error
Runtime error
import io | |
import logging | |
import os | |
import re | |
from pathlib import Path | |
from typing import Any, Dict, List, Mapping, Optional, Sequence, Union | |
import requests | |
from langchain_core.pydantic_v1 import BaseModel, root_validator | |
from langchain.docstore.document import Document | |
from langchain.document_loaders.base import BaseLoader | |
TD_NAME = "{http://www.w3.org/1999/xhtml}td" | |
TABLE_NAME = "{http://www.w3.org/1999/xhtml}table" | |
XPATH_KEY = "xpath" | |
DOCUMENT_ID_KEY = "id" | |
DOCUMENT_SOURCE_KEY = "source" | |
DOCUMENT_NAME_KEY = "name" | |
STRUCTURE_KEY = "structure" | |
TAG_KEY = "tag" | |
PROJECTS_KEY = "projects" | |
DEFAULT_API_ENDPOINT = "https://api.docugami.com/v1preview1" | |
logger = logging.getLogger(__name__) | |
class DocugamiLoader(BaseLoader, BaseModel): | |
"""Load from `Docugami`. | |
To use, you should have the ``lxml`` python package installed. | |
""" | |
api: str = DEFAULT_API_ENDPOINT | |
"""The Docugami API endpoint to use.""" | |
access_token: Optional[str] = os.environ.get("DOCUGAMI_API_KEY") | |
"""The Docugami API access token to use.""" | |
docset_id: Optional[str] | |
"""The Docugami API docset ID to use.""" | |
document_ids: Optional[Sequence[str]] | |
"""The Docugami API document IDs to use.""" | |
file_paths: Optional[Sequence[Union[Path, str]]] | |
"""The local file paths to use.""" | |
min_chunk_size: int = 32 # appended to the next chunk to avoid over-chunking | |
"""The minimum chunk size to use when parsing DGML. Defaults to 32.""" | |
def validate_local_or_remote(cls, values: Dict[str, Any]) -> Dict[str, Any]: | |
"""Validate that either local file paths are given, or remote API docset ID. | |
Args: | |
values: The values to validate. | |
Returns: | |
The validated values. | |
""" | |
if values.get("file_paths") and values.get("docset_id"): | |
raise ValueError("Cannot specify both file_paths and remote API docset_id") | |
if not values.get("file_paths") and not values.get("docset_id"): | |
raise ValueError("Must specify either file_paths or remote API docset_id") | |
if values.get("docset_id") and not values.get("access_token"): | |
raise ValueError("Must specify access token if using remote API docset_id") | |
return values | |
def _parse_dgml( | |
self, document: Mapping, content: bytes, doc_metadata: Optional[Mapping] = None | |
) -> List[Document]: | |
"""Parse a single DGML document into a list of Documents.""" | |
try: | |
from lxml import etree | |
except ImportError: | |
raise ImportError( | |
"Could not import lxml python package. " | |
"Please install it with `pip install lxml`." | |
) | |
# helpers | |
def _xpath_qname_for_chunk(chunk: Any) -> str: | |
"""Get the xpath qname for a chunk.""" | |
qname = f"{chunk.prefix}:{chunk.tag.split('}')[-1]}" | |
parent = chunk.getparent() | |
if parent is not None: | |
doppelgangers = [x for x in parent if x.tag == chunk.tag] | |
if len(doppelgangers) > 1: | |
idx_of_self = doppelgangers.index(chunk) | |
qname = f"{qname}[{idx_of_self + 1}]" | |
return qname | |
def _xpath_for_chunk(chunk: Any) -> str: | |
"""Get the xpath for a chunk.""" | |
ancestor_chain = chunk.xpath("ancestor-or-self::*") | |
return "/" + "/".join(_xpath_qname_for_chunk(x) for x in ancestor_chain) | |
def _structure_value(node: Any) -> str: | |
"""Get the structure value for a node.""" | |
structure = ( | |
"table" | |
if node.tag == TABLE_NAME | |
else node.attrib["structure"] | |
if "structure" in node.attrib | |
else None | |
) | |
return structure | |
def _is_structural(node: Any) -> bool: | |
"""Check if a node is structural.""" | |
return _structure_value(node) is not None | |
def _is_heading(node: Any) -> bool: | |
"""Check if a node is a heading.""" | |
structure = _structure_value(node) | |
return structure is not None and structure.lower().startswith("h") | |
def _get_text(node: Any) -> str: | |
"""Get the text of a node.""" | |
return " ".join(node.itertext()).strip() | |
def _has_structural_descendant(node: Any) -> bool: | |
"""Check if a node has a structural descendant.""" | |
for child in node: | |
if _is_structural(child) or _has_structural_descendant(child): | |
return True | |
return False | |
def _leaf_structural_nodes(node: Any) -> List: | |
"""Get the leaf structural nodes of a node.""" | |
if _is_structural(node) and not _has_structural_descendant(node): | |
return [node] | |
else: | |
leaf_nodes = [] | |
for child in node: | |
leaf_nodes.extend(_leaf_structural_nodes(child)) | |
return leaf_nodes | |
def _create_doc(node: Any, text: str) -> Document: | |
"""Create a Document from a node and text.""" | |
metadata = { | |
XPATH_KEY: _xpath_for_chunk(node), | |
DOCUMENT_ID_KEY: document[DOCUMENT_ID_KEY], | |
DOCUMENT_NAME_KEY: document[DOCUMENT_NAME_KEY], | |
DOCUMENT_SOURCE_KEY: document[DOCUMENT_NAME_KEY], | |
STRUCTURE_KEY: node.attrib.get("structure", ""), | |
TAG_KEY: re.sub(r"\{.*\}", "", node.tag), | |
} | |
if doc_metadata: | |
metadata.update(doc_metadata) | |
return Document( | |
page_content=text, | |
metadata=metadata, | |
) | |
# parse the tree and return chunks | |
tree = etree.parse(io.BytesIO(content)) | |
root = tree.getroot() | |
chunks: List[Document] = [] | |
prev_small_chunk_text = None | |
for node in _leaf_structural_nodes(root): | |
text = _get_text(node) | |
if prev_small_chunk_text: | |
text = prev_small_chunk_text + " " + text | |
prev_small_chunk_text = None | |
if _is_heading(node) or len(text) < self.min_chunk_size: | |
# Save headings or other small chunks to be appended to the next chunk | |
prev_small_chunk_text = text | |
else: | |
chunks.append(_create_doc(node, text)) | |
if prev_small_chunk_text and len(chunks) > 0: | |
# small chunk at the end left over, just append to last chunk | |
chunks[-1].page_content += " " + prev_small_chunk_text | |
return chunks | |
def _document_details_for_docset_id(self, docset_id: str) -> List[Dict]: | |
"""Gets all document details for the given docset ID""" | |
url = f"{self.api}/docsets/{docset_id}/documents" | |
all_documents = [] | |
while url: | |
response = requests.get( | |
url, | |
headers={"Authorization": f"Bearer {self.access_token}"}, | |
) | |
if response.ok: | |
data = response.json() | |
all_documents.extend(data["documents"]) | |
url = data.get("next", None) | |
else: | |
raise Exception( | |
f"Failed to download {url} (status: {response.status_code})" | |
) | |
return all_documents | |
def _project_details_for_docset_id(self, docset_id: str) -> List[Dict]: | |
"""Gets all project details for the given docset ID""" | |
url = f"{self.api}/projects?docset.id={docset_id}" | |
all_projects = [] | |
while url: | |
response = requests.request( | |
"GET", | |
url, | |
headers={"Authorization": f"Bearer {self.access_token}"}, | |
data={}, | |
) | |
if response.ok: | |
data = response.json() | |
all_projects.extend(data["projects"]) | |
url = data.get("next", None) | |
else: | |
raise Exception( | |
f"Failed to download {url} (status: {response.status_code})" | |
) | |
return all_projects | |
def _metadata_for_project(self, project: Dict) -> Dict: | |
"""Gets project metadata for all files""" | |
project_id = project.get("id") | |
url = f"{self.api}/projects/{project_id}/artifacts/latest" | |
all_artifacts = [] | |
while url: | |
response = requests.request( | |
"GET", | |
url, | |
headers={"Authorization": f"Bearer {self.access_token}"}, | |
data={}, | |
) | |
if response.ok: | |
data = response.json() | |
all_artifacts.extend(data["artifacts"]) | |
url = data.get("next", None) | |
else: | |
raise Exception( | |
f"Failed to download {url} (status: {response.status_code})" | |
) | |
per_file_metadata = {} | |
for artifact in all_artifacts: | |
artifact_name = artifact.get("name") | |
artifact_url = artifact.get("url") | |
artifact_doc = artifact.get("document") | |
if artifact_name == "report-values.xml" and artifact_url and artifact_doc: | |
doc_id = artifact_doc["id"] | |
metadata: Dict = {} | |
# the evaluated XML for each document is named after the project | |
response = requests.request( | |
"GET", | |
f"{artifact_url}/content", | |
headers={"Authorization": f"Bearer {self.access_token}"}, | |
data={}, | |
) | |
if response.ok: | |
try: | |
from lxml import etree | |
except ImportError: | |
raise ImportError( | |
"Could not import lxml python package. " | |
"Please install it with `pip install lxml`." | |
) | |
artifact_tree = etree.parse(io.BytesIO(response.content)) | |
artifact_root = artifact_tree.getroot() | |
ns = artifact_root.nsmap | |
entries = artifact_root.xpath("//pr:Entry", namespaces=ns) | |
for entry in entries: | |
heading = entry.xpath("./pr:Heading", namespaces=ns)[0].text | |
value = " ".join( | |
entry.xpath("./pr:Value", namespaces=ns)[0].itertext() | |
).strip() | |
metadata[heading] = value | |
per_file_metadata[doc_id] = metadata | |
else: | |
raise Exception( | |
f"Failed to download {artifact_url}/content " | |
+ "(status: {response.status_code})" | |
) | |
return per_file_metadata | |
def _load_chunks_for_document( | |
self, docset_id: str, document: Dict, doc_metadata: Optional[Dict] = None | |
) -> List[Document]: | |
"""Load chunks for a document.""" | |
document_id = document["id"] | |
url = f"{self.api}/docsets/{docset_id}/documents/{document_id}/dgml" | |
response = requests.request( | |
"GET", | |
url, | |
headers={"Authorization": f"Bearer {self.access_token}"}, | |
data={}, | |
) | |
if response.ok: | |
return self._parse_dgml(document, response.content, doc_metadata) | |
else: | |
raise Exception( | |
f"Failed to download {url} (status: {response.status_code})" | |
) | |
def load(self) -> List[Document]: | |
"""Load documents.""" | |
chunks: List[Document] = [] | |
if self.access_token and self.docset_id: | |
# remote mode | |
_document_details = self._document_details_for_docset_id(self.docset_id) | |
if self.document_ids: | |
_document_details = [ | |
d for d in _document_details if d["id"] in self.document_ids | |
] | |
_project_details = self._project_details_for_docset_id(self.docset_id) | |
combined_project_metadata = {} | |
if _project_details: | |
# if there are any projects for this docset, load project metadata | |
for project in _project_details: | |
metadata = self._metadata_for_project(project) | |
combined_project_metadata.update(metadata) | |
for doc in _document_details: | |
doc_metadata = combined_project_metadata.get(doc["id"]) | |
chunks += self._load_chunks_for_document( | |
self.docset_id, doc, doc_metadata | |
) | |
elif self.file_paths: | |
# local mode (for integration testing, or pre-downloaded XML) | |
for path in self.file_paths: | |
path = Path(path) | |
with open(path, "rb") as file: | |
chunks += self._parse_dgml( | |
{ | |
DOCUMENT_ID_KEY: path.name, | |
DOCUMENT_NAME_KEY: path.name, | |
}, | |
file.read(), | |
) | |
return chunks | |