Spaces:
Runtime error
Runtime error
| import io | |
| import logging | |
| import os | |
| import re | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Mapping, Optional, Sequence, Union | |
| import requests | |
| from langchain_core.pydantic_v1 import BaseModel, root_validator | |
| from langchain.docstore.document import Document | |
| from langchain.document_loaders.base import BaseLoader | |
| TD_NAME = "{http://www.w3.org/1999/xhtml}td" | |
| TABLE_NAME = "{http://www.w3.org/1999/xhtml}table" | |
| XPATH_KEY = "xpath" | |
| DOCUMENT_ID_KEY = "id" | |
| DOCUMENT_SOURCE_KEY = "source" | |
| DOCUMENT_NAME_KEY = "name" | |
| STRUCTURE_KEY = "structure" | |
| TAG_KEY = "tag" | |
| PROJECTS_KEY = "projects" | |
| DEFAULT_API_ENDPOINT = "https://api.docugami.com/v1preview1" | |
| logger = logging.getLogger(__name__) | |
| class DocugamiLoader(BaseLoader, BaseModel): | |
| """Load from `Docugami`. | |
| To use, you should have the ``lxml`` python package installed. | |
| """ | |
| api: str = DEFAULT_API_ENDPOINT | |
| """The Docugami API endpoint to use.""" | |
| access_token: Optional[str] = os.environ.get("DOCUGAMI_API_KEY") | |
| """The Docugami API access token to use.""" | |
| docset_id: Optional[str] | |
| """The Docugami API docset ID to use.""" | |
| document_ids: Optional[Sequence[str]] | |
| """The Docugami API document IDs to use.""" | |
| file_paths: Optional[Sequence[Union[Path, str]]] | |
| """The local file paths to use.""" | |
| min_chunk_size: int = 32 # appended to the next chunk to avoid over-chunking | |
| """The minimum chunk size to use when parsing DGML. Defaults to 32.""" | |
| def validate_local_or_remote(cls, values: Dict[str, Any]) -> Dict[str, Any]: | |
| """Validate that either local file paths are given, or remote API docset ID. | |
| Args: | |
| values: The values to validate. | |
| Returns: | |
| The validated values. | |
| """ | |
| if values.get("file_paths") and values.get("docset_id"): | |
| raise ValueError("Cannot specify both file_paths and remote API docset_id") | |
| if not values.get("file_paths") and not values.get("docset_id"): | |
| raise ValueError("Must specify either file_paths or remote API docset_id") | |
| if values.get("docset_id") and not values.get("access_token"): | |
| raise ValueError("Must specify access token if using remote API docset_id") | |
| return values | |
| def _parse_dgml( | |
| self, document: Mapping, content: bytes, doc_metadata: Optional[Mapping] = None | |
| ) -> List[Document]: | |
| """Parse a single DGML document into a list of Documents.""" | |
| try: | |
| from lxml import etree | |
| except ImportError: | |
| raise ImportError( | |
| "Could not import lxml python package. " | |
| "Please install it with `pip install lxml`." | |
| ) | |
| # helpers | |
| def _xpath_qname_for_chunk(chunk: Any) -> str: | |
| """Get the xpath qname for a chunk.""" | |
| qname = f"{chunk.prefix}:{chunk.tag.split('}')[-1]}" | |
| parent = chunk.getparent() | |
| if parent is not None: | |
| doppelgangers = [x for x in parent if x.tag == chunk.tag] | |
| if len(doppelgangers) > 1: | |
| idx_of_self = doppelgangers.index(chunk) | |
| qname = f"{qname}[{idx_of_self + 1}]" | |
| return qname | |
| def _xpath_for_chunk(chunk: Any) -> str: | |
| """Get the xpath for a chunk.""" | |
| ancestor_chain = chunk.xpath("ancestor-or-self::*") | |
| return "/" + "/".join(_xpath_qname_for_chunk(x) for x in ancestor_chain) | |
| def _structure_value(node: Any) -> str: | |
| """Get the structure value for a node.""" | |
| structure = ( | |
| "table" | |
| if node.tag == TABLE_NAME | |
| else node.attrib["structure"] | |
| if "structure" in node.attrib | |
| else None | |
| ) | |
| return structure | |
| def _is_structural(node: Any) -> bool: | |
| """Check if a node is structural.""" | |
| return _structure_value(node) is not None | |
| def _is_heading(node: Any) -> bool: | |
| """Check if a node is a heading.""" | |
| structure = _structure_value(node) | |
| return structure is not None and structure.lower().startswith("h") | |
| def _get_text(node: Any) -> str: | |
| """Get the text of a node.""" | |
| return " ".join(node.itertext()).strip() | |
| def _has_structural_descendant(node: Any) -> bool: | |
| """Check if a node has a structural descendant.""" | |
| for child in node: | |
| if _is_structural(child) or _has_structural_descendant(child): | |
| return True | |
| return False | |
| def _leaf_structural_nodes(node: Any) -> List: | |
| """Get the leaf structural nodes of a node.""" | |
| if _is_structural(node) and not _has_structural_descendant(node): | |
| return [node] | |
| else: | |
| leaf_nodes = [] | |
| for child in node: | |
| leaf_nodes.extend(_leaf_structural_nodes(child)) | |
| return leaf_nodes | |
| def _create_doc(node: Any, text: str) -> Document: | |
| """Create a Document from a node and text.""" | |
| metadata = { | |
| XPATH_KEY: _xpath_for_chunk(node), | |
| DOCUMENT_ID_KEY: document[DOCUMENT_ID_KEY], | |
| DOCUMENT_NAME_KEY: document[DOCUMENT_NAME_KEY], | |
| DOCUMENT_SOURCE_KEY: document[DOCUMENT_NAME_KEY], | |
| STRUCTURE_KEY: node.attrib.get("structure", ""), | |
| TAG_KEY: re.sub(r"\{.*\}", "", node.tag), | |
| } | |
| if doc_metadata: | |
| metadata.update(doc_metadata) | |
| return Document( | |
| page_content=text, | |
| metadata=metadata, | |
| ) | |
| # parse the tree and return chunks | |
| tree = etree.parse(io.BytesIO(content)) | |
| root = tree.getroot() | |
| chunks: List[Document] = [] | |
| prev_small_chunk_text = None | |
| for node in _leaf_structural_nodes(root): | |
| text = _get_text(node) | |
| if prev_small_chunk_text: | |
| text = prev_small_chunk_text + " " + text | |
| prev_small_chunk_text = None | |
| if _is_heading(node) or len(text) < self.min_chunk_size: | |
| # Save headings or other small chunks to be appended to the next chunk | |
| prev_small_chunk_text = text | |
| else: | |
| chunks.append(_create_doc(node, text)) | |
| if prev_small_chunk_text and len(chunks) > 0: | |
| # small chunk at the end left over, just append to last chunk | |
| chunks[-1].page_content += " " + prev_small_chunk_text | |
| return chunks | |
| def _document_details_for_docset_id(self, docset_id: str) -> List[Dict]: | |
| """Gets all document details for the given docset ID""" | |
| url = f"{self.api}/docsets/{docset_id}/documents" | |
| all_documents = [] | |
| while url: | |
| response = requests.get( | |
| url, | |
| headers={"Authorization": f"Bearer {self.access_token}"}, | |
| ) | |
| if response.ok: | |
| data = response.json() | |
| all_documents.extend(data["documents"]) | |
| url = data.get("next", None) | |
| else: | |
| raise Exception( | |
| f"Failed to download {url} (status: {response.status_code})" | |
| ) | |
| return all_documents | |
| def _project_details_for_docset_id(self, docset_id: str) -> List[Dict]: | |
| """Gets all project details for the given docset ID""" | |
| url = f"{self.api}/projects?docset.id={docset_id}" | |
| all_projects = [] | |
| while url: | |
| response = requests.request( | |
| "GET", | |
| url, | |
| headers={"Authorization": f"Bearer {self.access_token}"}, | |
| data={}, | |
| ) | |
| if response.ok: | |
| data = response.json() | |
| all_projects.extend(data["projects"]) | |
| url = data.get("next", None) | |
| else: | |
| raise Exception( | |
| f"Failed to download {url} (status: {response.status_code})" | |
| ) | |
| return all_projects | |
| def _metadata_for_project(self, project: Dict) -> Dict: | |
| """Gets project metadata for all files""" | |
| project_id = project.get("id") | |
| url = f"{self.api}/projects/{project_id}/artifacts/latest" | |
| all_artifacts = [] | |
| while url: | |
| response = requests.request( | |
| "GET", | |
| url, | |
| headers={"Authorization": f"Bearer {self.access_token}"}, | |
| data={}, | |
| ) | |
| if response.ok: | |
| data = response.json() | |
| all_artifacts.extend(data["artifacts"]) | |
| url = data.get("next", None) | |
| else: | |
| raise Exception( | |
| f"Failed to download {url} (status: {response.status_code})" | |
| ) | |
| per_file_metadata = {} | |
| for artifact in all_artifacts: | |
| artifact_name = artifact.get("name") | |
| artifact_url = artifact.get("url") | |
| artifact_doc = artifact.get("document") | |
| if artifact_name == "report-values.xml" and artifact_url and artifact_doc: | |
| doc_id = artifact_doc["id"] | |
| metadata: Dict = {} | |
| # the evaluated XML for each document is named after the project | |
| response = requests.request( | |
| "GET", | |
| f"{artifact_url}/content", | |
| headers={"Authorization": f"Bearer {self.access_token}"}, | |
| data={}, | |
| ) | |
| if response.ok: | |
| try: | |
| from lxml import etree | |
| except ImportError: | |
| raise ImportError( | |
| "Could not import lxml python package. " | |
| "Please install it with `pip install lxml`." | |
| ) | |
| artifact_tree = etree.parse(io.BytesIO(response.content)) | |
| artifact_root = artifact_tree.getroot() | |
| ns = artifact_root.nsmap | |
| entries = artifact_root.xpath("//pr:Entry", namespaces=ns) | |
| for entry in entries: | |
| heading = entry.xpath("./pr:Heading", namespaces=ns)[0].text | |
| value = " ".join( | |
| entry.xpath("./pr:Value", namespaces=ns)[0].itertext() | |
| ).strip() | |
| metadata[heading] = value | |
| per_file_metadata[doc_id] = metadata | |
| else: | |
| raise Exception( | |
| f"Failed to download {artifact_url}/content " | |
| + "(status: {response.status_code})" | |
| ) | |
| return per_file_metadata | |
| def _load_chunks_for_document( | |
| self, docset_id: str, document: Dict, doc_metadata: Optional[Dict] = None | |
| ) -> List[Document]: | |
| """Load chunks for a document.""" | |
| document_id = document["id"] | |
| url = f"{self.api}/docsets/{docset_id}/documents/{document_id}/dgml" | |
| response = requests.request( | |
| "GET", | |
| url, | |
| headers={"Authorization": f"Bearer {self.access_token}"}, | |
| data={}, | |
| ) | |
| if response.ok: | |
| return self._parse_dgml(document, response.content, doc_metadata) | |
| else: | |
| raise Exception( | |
| f"Failed to download {url} (status: {response.status_code})" | |
| ) | |
| def load(self) -> List[Document]: | |
| """Load documents.""" | |
| chunks: List[Document] = [] | |
| if self.access_token and self.docset_id: | |
| # remote mode | |
| _document_details = self._document_details_for_docset_id(self.docset_id) | |
| if self.document_ids: | |
| _document_details = [ | |
| d for d in _document_details if d["id"] in self.document_ids | |
| ] | |
| _project_details = self._project_details_for_docset_id(self.docset_id) | |
| combined_project_metadata = {} | |
| if _project_details: | |
| # if there are any projects for this docset, load project metadata | |
| for project in _project_details: | |
| metadata = self._metadata_for_project(project) | |
| combined_project_metadata.update(metadata) | |
| for doc in _document_details: | |
| doc_metadata = combined_project_metadata.get(doc["id"]) | |
| chunks += self._load_chunks_for_document( | |
| self.docset_id, doc, doc_metadata | |
| ) | |
| elif self.file_paths: | |
| # local mode (for integration testing, or pre-downloaded XML) | |
| for path in self.file_paths: | |
| path = Path(path) | |
| with open(path, "rb") as file: | |
| chunks += self._parse_dgml( | |
| { | |
| DOCUMENT_ID_KEY: path.name, | |
| DOCUMENT_NAME_KEY: path.name, | |
| }, | |
| file.read(), | |
| ) | |
| return chunks | |