zhangyi617's picture
Upload folder using huggingface_hub
129cd69
import io
import logging
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional, Sequence, Union
import requests
from langchain_core.pydantic_v1 import BaseModel, root_validator
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
TD_NAME = "{http://www.w3.org/1999/xhtml}td"
TABLE_NAME = "{http://www.w3.org/1999/xhtml}table"
XPATH_KEY = "xpath"
DOCUMENT_ID_KEY = "id"
DOCUMENT_SOURCE_KEY = "source"
DOCUMENT_NAME_KEY = "name"
STRUCTURE_KEY = "structure"
TAG_KEY = "tag"
PROJECTS_KEY = "projects"
DEFAULT_API_ENDPOINT = "https://api.docugami.com/v1preview1"
logger = logging.getLogger(__name__)
class DocugamiLoader(BaseLoader, BaseModel):
"""Load from `Docugami`.
To use, you should have the ``lxml`` python package installed.
"""
api: str = DEFAULT_API_ENDPOINT
"""The Docugami API endpoint to use."""
access_token: Optional[str] = os.environ.get("DOCUGAMI_API_KEY")
"""The Docugami API access token to use."""
docset_id: Optional[str]
"""The Docugami API docset ID to use."""
document_ids: Optional[Sequence[str]]
"""The Docugami API document IDs to use."""
file_paths: Optional[Sequence[Union[Path, str]]]
"""The local file paths to use."""
min_chunk_size: int = 32 # appended to the next chunk to avoid over-chunking
"""The minimum chunk size to use when parsing DGML. Defaults to 32."""
@root_validator
def validate_local_or_remote(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that either local file paths are given, or remote API docset ID.
Args:
values: The values to validate.
Returns:
The validated values.
"""
if values.get("file_paths") and values.get("docset_id"):
raise ValueError("Cannot specify both file_paths and remote API docset_id")
if not values.get("file_paths") and not values.get("docset_id"):
raise ValueError("Must specify either file_paths or remote API docset_id")
if values.get("docset_id") and not values.get("access_token"):
raise ValueError("Must specify access token if using remote API docset_id")
return values
def _parse_dgml(
self, document: Mapping, content: bytes, doc_metadata: Optional[Mapping] = None
) -> List[Document]:
"""Parse a single DGML document into a list of Documents."""
try:
from lxml import etree
except ImportError:
raise ImportError(
"Could not import lxml python package. "
"Please install it with `pip install lxml`."
)
# helpers
def _xpath_qname_for_chunk(chunk: Any) -> str:
"""Get the xpath qname for a chunk."""
qname = f"{chunk.prefix}:{chunk.tag.split('}')[-1]}"
parent = chunk.getparent()
if parent is not None:
doppelgangers = [x for x in parent if x.tag == chunk.tag]
if len(doppelgangers) > 1:
idx_of_self = doppelgangers.index(chunk)
qname = f"{qname}[{idx_of_self + 1}]"
return qname
def _xpath_for_chunk(chunk: Any) -> str:
"""Get the xpath for a chunk."""
ancestor_chain = chunk.xpath("ancestor-or-self::*")
return "/" + "/".join(_xpath_qname_for_chunk(x) for x in ancestor_chain)
def _structure_value(node: Any) -> str:
"""Get the structure value for a node."""
structure = (
"table"
if node.tag == TABLE_NAME
else node.attrib["structure"]
if "structure" in node.attrib
else None
)
return structure
def _is_structural(node: Any) -> bool:
"""Check if a node is structural."""
return _structure_value(node) is not None
def _is_heading(node: Any) -> bool:
"""Check if a node is a heading."""
structure = _structure_value(node)
return structure is not None and structure.lower().startswith("h")
def _get_text(node: Any) -> str:
"""Get the text of a node."""
return " ".join(node.itertext()).strip()
def _has_structural_descendant(node: Any) -> bool:
"""Check if a node has a structural descendant."""
for child in node:
if _is_structural(child) or _has_structural_descendant(child):
return True
return False
def _leaf_structural_nodes(node: Any) -> List:
"""Get the leaf structural nodes of a node."""
if _is_structural(node) and not _has_structural_descendant(node):
return [node]
else:
leaf_nodes = []
for child in node:
leaf_nodes.extend(_leaf_structural_nodes(child))
return leaf_nodes
def _create_doc(node: Any, text: str) -> Document:
"""Create a Document from a node and text."""
metadata = {
XPATH_KEY: _xpath_for_chunk(node),
DOCUMENT_ID_KEY: document[DOCUMENT_ID_KEY],
DOCUMENT_NAME_KEY: document[DOCUMENT_NAME_KEY],
DOCUMENT_SOURCE_KEY: document[DOCUMENT_NAME_KEY],
STRUCTURE_KEY: node.attrib.get("structure", ""),
TAG_KEY: re.sub(r"\{.*\}", "", node.tag),
}
if doc_metadata:
metadata.update(doc_metadata)
return Document(
page_content=text,
metadata=metadata,
)
# parse the tree and return chunks
tree = etree.parse(io.BytesIO(content))
root = tree.getroot()
chunks: List[Document] = []
prev_small_chunk_text = None
for node in _leaf_structural_nodes(root):
text = _get_text(node)
if prev_small_chunk_text:
text = prev_small_chunk_text + " " + text
prev_small_chunk_text = None
if _is_heading(node) or len(text) < self.min_chunk_size:
# Save headings or other small chunks to be appended to the next chunk
prev_small_chunk_text = text
else:
chunks.append(_create_doc(node, text))
if prev_small_chunk_text and len(chunks) > 0:
# small chunk at the end left over, just append to last chunk
chunks[-1].page_content += " " + prev_small_chunk_text
return chunks
def _document_details_for_docset_id(self, docset_id: str) -> List[Dict]:
"""Gets all document details for the given docset ID"""
url = f"{self.api}/docsets/{docset_id}/documents"
all_documents = []
while url:
response = requests.get(
url,
headers={"Authorization": f"Bearer {self.access_token}"},
)
if response.ok:
data = response.json()
all_documents.extend(data["documents"])
url = data.get("next", None)
else:
raise Exception(
f"Failed to download {url} (status: {response.status_code})"
)
return all_documents
def _project_details_for_docset_id(self, docset_id: str) -> List[Dict]:
"""Gets all project details for the given docset ID"""
url = f"{self.api}/projects?docset.id={docset_id}"
all_projects = []
while url:
response = requests.request(
"GET",
url,
headers={"Authorization": f"Bearer {self.access_token}"},
data={},
)
if response.ok:
data = response.json()
all_projects.extend(data["projects"])
url = data.get("next", None)
else:
raise Exception(
f"Failed to download {url} (status: {response.status_code})"
)
return all_projects
def _metadata_for_project(self, project: Dict) -> Dict:
"""Gets project metadata for all files"""
project_id = project.get("id")
url = f"{self.api}/projects/{project_id}/artifacts/latest"
all_artifacts = []
while url:
response = requests.request(
"GET",
url,
headers={"Authorization": f"Bearer {self.access_token}"},
data={},
)
if response.ok:
data = response.json()
all_artifacts.extend(data["artifacts"])
url = data.get("next", None)
else:
raise Exception(
f"Failed to download {url} (status: {response.status_code})"
)
per_file_metadata = {}
for artifact in all_artifacts:
artifact_name = artifact.get("name")
artifact_url = artifact.get("url")
artifact_doc = artifact.get("document")
if artifact_name == "report-values.xml" and artifact_url and artifact_doc:
doc_id = artifact_doc["id"]
metadata: Dict = {}
# the evaluated XML for each document is named after the project
response = requests.request(
"GET",
f"{artifact_url}/content",
headers={"Authorization": f"Bearer {self.access_token}"},
data={},
)
if response.ok:
try:
from lxml import etree
except ImportError:
raise ImportError(
"Could not import lxml python package. "
"Please install it with `pip install lxml`."
)
artifact_tree = etree.parse(io.BytesIO(response.content))
artifact_root = artifact_tree.getroot()
ns = artifact_root.nsmap
entries = artifact_root.xpath("//pr:Entry", namespaces=ns)
for entry in entries:
heading = entry.xpath("./pr:Heading", namespaces=ns)[0].text
value = " ".join(
entry.xpath("./pr:Value", namespaces=ns)[0].itertext()
).strip()
metadata[heading] = value
per_file_metadata[doc_id] = metadata
else:
raise Exception(
f"Failed to download {artifact_url}/content "
+ "(status: {response.status_code})"
)
return per_file_metadata
def _load_chunks_for_document(
self, docset_id: str, document: Dict, doc_metadata: Optional[Dict] = None
) -> List[Document]:
"""Load chunks for a document."""
document_id = document["id"]
url = f"{self.api}/docsets/{docset_id}/documents/{document_id}/dgml"
response = requests.request(
"GET",
url,
headers={"Authorization": f"Bearer {self.access_token}"},
data={},
)
if response.ok:
return self._parse_dgml(document, response.content, doc_metadata)
else:
raise Exception(
f"Failed to download {url} (status: {response.status_code})"
)
def load(self) -> List[Document]:
"""Load documents."""
chunks: List[Document] = []
if self.access_token and self.docset_id:
# remote mode
_document_details = self._document_details_for_docset_id(self.docset_id)
if self.document_ids:
_document_details = [
d for d in _document_details if d["id"] in self.document_ids
]
_project_details = self._project_details_for_docset_id(self.docset_id)
combined_project_metadata = {}
if _project_details:
# if there are any projects for this docset, load project metadata
for project in _project_details:
metadata = self._metadata_for_project(project)
combined_project_metadata.update(metadata)
for doc in _document_details:
doc_metadata = combined_project_metadata.get(doc["id"])
chunks += self._load_chunks_for_document(
self.docset_id, doc, doc_metadata
)
elif self.file_paths:
# local mode (for integration testing, or pre-downloaded XML)
for path in self.file_paths:
path = Path(path)
with open(path, "rb") as file:
chunks += self._parse_dgml(
{
DOCUMENT_ID_KEY: path.name,
DOCUMENT_NAME_KEY: path.name,
},
file.read(),
)
return chunks