Spaces:
Runtime error
Runtime error
import logging | |
from enum import Enum | |
from io import BytesIO | |
from typing import Any, Callable, Dict, List, Optional, Union | |
import requests | |
from tenacity import ( | |
before_sleep_log, | |
retry, | |
stop_after_attempt, | |
wait_exponential, | |
) | |
from langchain.docstore.document import Document | |
from langchain.document_loaders.base import BaseLoader | |
logger = logging.getLogger(__name__) | |
class ContentFormat(str, Enum): | |
"""Enumerator of the content formats of Confluence page.""" | |
EDITOR = "body.editor" | |
EXPORT_VIEW = "body.export_view" | |
ANONYMOUS_EXPORT_VIEW = "body.anonymous_export_view" | |
STORAGE = "body.storage" | |
VIEW = "body.view" | |
def get_content(self, page: dict) -> str: | |
return page["body"][self.name.lower()]["value"] | |
class ConfluenceLoader(BaseLoader): | |
"""Load `Confluence` pages. | |
Port of https://llamahub.ai/l/confluence | |
This currently supports username/api_key, Oauth2 login or personal access token | |
authentication. | |
Specify a list page_ids and/or space_key to load in the corresponding pages into | |
Document objects, if both are specified the union of both sets will be returned. | |
You can also specify a boolean `include_attachments` to include attachments, this | |
is set to False by default, if set to True all attachments will be downloaded and | |
ConfluenceReader will extract the text from the attachments and add it to the | |
Document object. Currently supported attachment types are: PDF, PNG, JPEG/JPG, | |
SVG, Word and Excel. | |
Confluence API supports difference format of page content. The storage format is the | |
raw XML representation for storage. The view format is the HTML representation for | |
viewing with macros are rendered as though it is viewed by users. You can pass | |
a enum `content_format` argument to `load()` to specify the content format, this is | |
set to `ContentFormat.STORAGE` by default, the supported values are: | |
`ContentFormat.EDITOR`, `ContentFormat.EXPORT_VIEW`, | |
`ContentFormat.ANONYMOUS_EXPORT_VIEW`, `ContentFormat.STORAGE`, | |
and `ContentFormat.VIEW`. | |
Hint: space_key and page_id can both be found in the URL of a page in Confluence | |
- https://yoursite.atlassian.com/wiki/spaces/<space_key>/pages/<page_id> | |
Example: | |
.. code-block:: python | |
from langchain.document_loaders import ConfluenceLoader | |
loader = ConfluenceLoader( | |
url="https://yoursite.atlassian.com/wiki", | |
username="me", | |
api_key="12345" | |
) | |
documents = loader.load(space_key="SPACE",limit=50) | |
# Server on perm | |
loader = ConfluenceLoader( | |
url="https://confluence.yoursite.com/", | |
username="me", | |
api_key="your_password", | |
cloud=False | |
) | |
documents = loader.load(space_key="SPACE",limit=50) | |
:param url: _description_ | |
:type url: str | |
:param api_key: _description_, defaults to None | |
:type api_key: str, optional | |
:param username: _description_, defaults to None | |
:type username: str, optional | |
:param oauth2: _description_, defaults to {} | |
:type oauth2: dict, optional | |
:param token: _description_, defaults to None | |
:type token: str, optional | |
:param cloud: _description_, defaults to True | |
:type cloud: bool, optional | |
:param number_of_retries: How many times to retry, defaults to 3 | |
:type number_of_retries: Optional[int], optional | |
:param min_retry_seconds: defaults to 2 | |
:type min_retry_seconds: Optional[int], optional | |
:param max_retry_seconds: defaults to 10 | |
:type max_retry_seconds: Optional[int], optional | |
:param confluence_kwargs: additional kwargs to initialize confluence with | |
:type confluence_kwargs: dict, optional | |
:raises ValueError: Errors while validating input | |
:raises ImportError: Required dependencies not installed. | |
""" | |
def __init__( | |
self, | |
url: str, | |
api_key: Optional[str] = None, | |
username: Optional[str] = None, | |
session: Optional[requests.Session] = None, | |
oauth2: Optional[dict] = None, | |
token: Optional[str] = None, | |
cloud: Optional[bool] = True, | |
number_of_retries: Optional[int] = 3, | |
min_retry_seconds: Optional[int] = 2, | |
max_retry_seconds: Optional[int] = 10, | |
confluence_kwargs: Optional[dict] = None, | |
): | |
confluence_kwargs = confluence_kwargs or {} | |
errors = ConfluenceLoader.validate_init_args( | |
url=url, | |
api_key=api_key, | |
username=username, | |
session=session, | |
oauth2=oauth2, | |
token=token, | |
) | |
if errors: | |
raise ValueError(f"Error(s) while validating input: {errors}") | |
try: | |
from atlassian import Confluence # noqa: F401 | |
except ImportError: | |
raise ImportError( | |
"`atlassian` package not found, please run " | |
"`pip install atlassian-python-api`" | |
) | |
self.base_url = url | |
self.number_of_retries = number_of_retries | |
self.min_retry_seconds = min_retry_seconds | |
self.max_retry_seconds = max_retry_seconds | |
if session: | |
self.confluence = Confluence(url=url, session=session, **confluence_kwargs) | |
elif oauth2: | |
self.confluence = Confluence( | |
url=url, oauth2=oauth2, cloud=cloud, **confluence_kwargs | |
) | |
elif token: | |
self.confluence = Confluence( | |
url=url, token=token, cloud=cloud, **confluence_kwargs | |
) | |
else: | |
self.confluence = Confluence( | |
url=url, | |
username=username, | |
password=api_key, | |
cloud=cloud, | |
**confluence_kwargs, | |
) | |
def validate_init_args( | |
url: Optional[str] = None, | |
api_key: Optional[str] = None, | |
username: Optional[str] = None, | |
session: Optional[requests.Session] = None, | |
oauth2: Optional[dict] = None, | |
token: Optional[str] = None, | |
) -> Union[List, None]: | |
"""Validates proper combinations of init arguments""" | |
errors = [] | |
if url is None: | |
errors.append("Must provide `base_url`") | |
if (api_key and not username) or (username and not api_key): | |
errors.append( | |
"If one of `api_key` or `username` is provided, " | |
"the other must be as well." | |
) | |
non_null_creds = list( | |
x is not None for x in ((api_key or username), session, oauth2, token) | |
) | |
if sum(non_null_creds) > 1: | |
all_names = ("(api_key, username)", "session", "oath2", "token") | |
provided = tuple(n for x, n in zip(non_null_creds, all_names) if x) | |
errors.append( | |
f"Cannot provide a value for more than one of: {all_names}. Received " | |
f"values for: {provided}" | |
) | |
if oauth2 and set(oauth2.keys()) != { | |
"access_token", | |
"access_token_secret", | |
"consumer_key", | |
"key_cert", | |
}: | |
errors.append( | |
"You have either omitted require keys or added extra " | |
"keys to the oauth2 dictionary. key values should be " | |
"`['access_token', 'access_token_secret', 'consumer_key', 'key_cert']`" | |
) | |
return errors or None | |
def load( | |
self, | |
space_key: Optional[str] = None, | |
page_ids: Optional[List[str]] = None, | |
label: Optional[str] = None, | |
cql: Optional[str] = None, | |
include_restricted_content: bool = False, | |
include_archived_content: bool = False, | |
include_attachments: bool = False, | |
include_comments: bool = False, | |
content_format: ContentFormat = ContentFormat.STORAGE, | |
limit: Optional[int] = 50, | |
max_pages: Optional[int] = 1000, | |
ocr_languages: Optional[str] = None, | |
keep_markdown_format: bool = False, | |
keep_newlines: bool = False, | |
) -> List[Document]: | |
""" | |
:param space_key: Space key retrieved from a confluence URL, defaults to None | |
:type space_key: Optional[str], optional | |
:param page_ids: List of specific page IDs to load, defaults to None | |
:type page_ids: Optional[List[str]], optional | |
:param label: Get all pages with this label, defaults to None | |
:type label: Optional[str], optional | |
:param cql: CQL Expression, defaults to None | |
:type cql: Optional[str], optional | |
:param include_restricted_content: defaults to False | |
:type include_restricted_content: bool, optional | |
:param include_archived_content: Whether to include archived content, | |
defaults to False | |
:type include_archived_content: bool, optional | |
:param include_attachments: defaults to False | |
:type include_attachments: bool, optional | |
:param include_comments: defaults to False | |
:type include_comments: bool, optional | |
:param content_format: Specify content format, defaults to | |
ContentFormat.STORAGE, the supported values are: | |
`ContentFormat.EDITOR`, `ContentFormat.EXPORT_VIEW`, | |
`ContentFormat.ANONYMOUS_EXPORT_VIEW`, | |
`ContentFormat.STORAGE`, and `ContentFormat.VIEW`. | |
:type content_format: ContentFormat | |
:param limit: Maximum number of pages to retrieve per request, defaults to 50 | |
:type limit: int, optional | |
:param max_pages: Maximum number of pages to retrieve in total, defaults 1000 | |
:type max_pages: int, optional | |
:param ocr_languages: The languages to use for the Tesseract agent. To use a | |
language, you'll first need to install the appropriate | |
Tesseract language pack. | |
:type ocr_languages: str, optional | |
:param keep_markdown_format: Whether to keep the markdown format, defaults to | |
False | |
:type keep_markdown_format: bool | |
:param keep_newlines: Whether to keep the newlines format, defaults to | |
False | |
:type keep_newlines: bool | |
:raises ValueError: _description_ | |
:raises ImportError: _description_ | |
:return: _description_ | |
:rtype: List[Document] | |
""" | |
if not space_key and not page_ids and not label and not cql: | |
raise ValueError( | |
"Must specify at least one among `space_key`, `page_ids`, " | |
"`label`, `cql` parameters." | |
) | |
docs = [] | |
if space_key: | |
pages = self.paginate_request( | |
self.confluence.get_all_pages_from_space, | |
space=space_key, | |
limit=limit, | |
max_pages=max_pages, | |
status="any" if include_archived_content else "current", | |
expand=content_format.value, | |
) | |
docs += self.process_pages( | |
pages, | |
include_restricted_content, | |
include_attachments, | |
include_comments, | |
content_format, | |
ocr_languages=ocr_languages, | |
keep_markdown_format=keep_markdown_format, | |
keep_newlines=keep_newlines, | |
) | |
if label: | |
pages = self.paginate_request( | |
self.confluence.get_all_pages_by_label, | |
label=label, | |
limit=limit, | |
max_pages=max_pages, | |
) | |
ids_by_label = [page["id"] for page in pages] | |
if page_ids: | |
page_ids = list(set(page_ids + ids_by_label)) | |
else: | |
page_ids = list(set(ids_by_label)) | |
if cql: | |
pages = self.paginate_request( | |
self._search_content_by_cql, | |
cql=cql, | |
limit=limit, | |
max_pages=max_pages, | |
include_archived_spaces=include_archived_content, | |
expand=content_format.value, | |
) | |
docs += self.process_pages( | |
pages, | |
include_restricted_content, | |
include_attachments, | |
include_comments, | |
content_format, | |
ocr_languages, | |
keep_markdown_format, | |
) | |
if page_ids: | |
for page_id in page_ids: | |
get_page = retry( | |
reraise=True, | |
stop=stop_after_attempt( | |
self.number_of_retries # type: ignore[arg-type] | |
), | |
wait=wait_exponential( | |
multiplier=1, # type: ignore[arg-type] | |
min=self.min_retry_seconds, # type: ignore[arg-type] | |
max=self.max_retry_seconds, # type: ignore[arg-type] | |
), | |
before_sleep=before_sleep_log(logger, logging.WARNING), | |
)(self.confluence.get_page_by_id) | |
page = get_page( | |
page_id=page_id, expand=f"{content_format.value},version" | |
) | |
if not include_restricted_content and not self.is_public_page(page): | |
continue | |
doc = self.process_page( | |
page, | |
include_attachments, | |
include_comments, | |
content_format, | |
ocr_languages, | |
keep_markdown_format, | |
) | |
docs.append(doc) | |
return docs | |
def _search_content_by_cql( | |
self, cql: str, include_archived_spaces: Optional[bool] = None, **kwargs: Any | |
) -> List[dict]: | |
url = "rest/api/content/search" | |
params: Dict[str, Any] = {"cql": cql} | |
params.update(kwargs) | |
if include_archived_spaces is not None: | |
params["includeArchivedSpaces"] = include_archived_spaces | |
response = self.confluence.get(url, params=params) | |
return response.get("results", []) | |
def paginate_request(self, retrieval_method: Callable, **kwargs: Any) -> List: | |
"""Paginate the various methods to retrieve groups of pages. | |
Unfortunately, due to page size, sometimes the Confluence API | |
doesn't match the limit value. If `limit` is >100 confluence | |
seems to cap the response to 100. Also, due to the Atlassian Python | |
package, we don't get the "next" values from the "_links" key because | |
they only return the value from the result key. So here, the pagination | |
starts from 0 and goes until the max_pages, getting the `limit` number | |
of pages with each request. We have to manually check if there | |
are more docs based on the length of the returned list of pages, rather than | |
just checking for the presence of a `next` key in the response like this page | |
would have you do: | |
https://developer.atlassian.com/server/confluence/pagination-in-the-rest-api/ | |
:param retrieval_method: Function used to retrieve docs | |
:type retrieval_method: callable | |
:return: List of documents | |
:rtype: List | |
""" | |
max_pages = kwargs.pop("max_pages") | |
docs: List[dict] = [] | |
while len(docs) < max_pages: | |
get_pages = retry( | |
reraise=True, | |
stop=stop_after_attempt( | |
self.number_of_retries # type: ignore[arg-type] | |
), | |
wait=wait_exponential( | |
multiplier=1, | |
min=self.min_retry_seconds, # type: ignore[arg-type] | |
max=self.max_retry_seconds, # type: ignore[arg-type] | |
), | |
before_sleep=before_sleep_log(logger, logging.WARNING), | |
)(retrieval_method) | |
batch = get_pages(**kwargs, start=len(docs)) | |
if not batch: | |
break | |
docs.extend(batch) | |
return docs[:max_pages] | |
def is_public_page(self, page: dict) -> bool: | |
"""Check if a page is publicly accessible.""" | |
restrictions = self.confluence.get_all_restrictions_for_content(page["id"]) | |
return ( | |
page["status"] == "current" | |
and not restrictions["read"]["restrictions"]["user"]["results"] | |
and not restrictions["read"]["restrictions"]["group"]["results"] | |
) | |
def process_pages( | |
self, | |
pages: List[dict], | |
include_restricted_content: bool, | |
include_attachments: bool, | |
include_comments: bool, | |
content_format: ContentFormat, | |
ocr_languages: Optional[str] = None, | |
keep_markdown_format: Optional[bool] = False, | |
keep_newlines: bool = False, | |
) -> List[Document]: | |
"""Process a list of pages into a list of documents.""" | |
docs = [] | |
for page in pages: | |
if not include_restricted_content and not self.is_public_page(page): | |
continue | |
doc = self.process_page( | |
page, | |
include_attachments, | |
include_comments, | |
content_format, | |
ocr_languages=ocr_languages, | |
keep_markdown_format=keep_markdown_format, | |
keep_newlines=keep_newlines, | |
) | |
docs.append(doc) | |
return docs | |
def process_page( | |
self, | |
page: dict, | |
include_attachments: bool, | |
include_comments: bool, | |
content_format: ContentFormat, | |
ocr_languages: Optional[str] = None, | |
keep_markdown_format: Optional[bool] = False, | |
keep_newlines: bool = False, | |
) -> Document: | |
if keep_markdown_format: | |
try: | |
from markdownify import markdownify | |
except ImportError: | |
raise ImportError( | |
"`markdownify` package not found, please run " | |
"`pip install markdownify`" | |
) | |
if include_comments or not keep_markdown_format: | |
try: | |
from bs4 import BeautifulSoup # type: ignore | |
except ImportError: | |
raise ImportError( | |
"`beautifulsoup4` package not found, please run " | |
"`pip install beautifulsoup4`" | |
) | |
if include_attachments: | |
attachment_texts = self.process_attachment(page["id"], ocr_languages) | |
else: | |
attachment_texts = [] | |
content = content_format.get_content(page) | |
if keep_markdown_format: | |
# Use markdownify to keep the page Markdown style | |
text = markdownify(content, heading_style="ATX") + "".join(attachment_texts) | |
else: | |
if keep_newlines: | |
text = BeautifulSoup( | |
content.replace("</p>", "\n</p>").replace("<br />", "\n"), "lxml" | |
).get_text(" ") + "".join(attachment_texts) | |
else: | |
text = BeautifulSoup(content, "lxml").get_text( | |
" ", strip=True | |
) + "".join(attachment_texts) | |
if include_comments: | |
comments = self.confluence.get_page_comments( | |
page["id"], expand="body.view.value", depth="all" | |
)["results"] | |
comment_texts = [ | |
BeautifulSoup(comment["body"]["view"]["value"], "lxml").get_text( | |
" ", strip=True | |
) | |
for comment in comments | |
] | |
text = text + "".join(comment_texts) | |
metadata = { | |
"title": page["title"], | |
"id": page["id"], | |
"source": self.base_url.strip("/") + page["_links"]["webui"], | |
} | |
if "version" in page and "when" in page["version"]: | |
metadata["when"] = page["version"]["when"] | |
return Document( | |
page_content=text, | |
metadata=metadata, | |
) | |
def process_attachment( | |
self, | |
page_id: str, | |
ocr_languages: Optional[str] = None, | |
) -> List[str]: | |
try: | |
from PIL import Image # noqa: F401 | |
except ImportError: | |
raise ImportError( | |
"`Pillow` package not found, " "please run `pip install Pillow`" | |
) | |
# depending on setup you may also need to set the correct path for | |
# poppler and tesseract | |
attachments = self.confluence.get_attachments_from_content(page_id)["results"] | |
texts = [] | |
for attachment in attachments: | |
media_type = attachment["metadata"]["mediaType"] | |
absolute_url = self.base_url + attachment["_links"]["download"] | |
title = attachment["title"] | |
try: | |
if media_type == "application/pdf": | |
text = title + self.process_pdf(absolute_url, ocr_languages) | |
elif ( | |
media_type == "image/png" | |
or media_type == "image/jpg" | |
or media_type == "image/jpeg" | |
): | |
text = title + self.process_image(absolute_url, ocr_languages) | |
elif ( | |
media_type == "application/vnd.openxmlformats-officedocument" | |
".wordprocessingml.document" | |
): | |
text = title + self.process_doc(absolute_url) | |
elif media_type == "application/vnd.ms-excel": | |
text = title + self.process_xls(absolute_url) | |
elif media_type == "image/svg+xml": | |
text = title + self.process_svg(absolute_url, ocr_languages) | |
else: | |
continue | |
texts.append(text) | |
except requests.HTTPError as e: | |
if e.response.status_code == 404: | |
print(f"Attachment not found at {absolute_url}") | |
continue | |
else: | |
raise | |
return texts | |
def process_pdf( | |
self, | |
link: str, | |
ocr_languages: Optional[str] = None, | |
) -> str: | |
try: | |
import pytesseract # noqa: F401 | |
from pdf2image import convert_from_bytes # noqa: F401 | |
except ImportError: | |
raise ImportError( | |
"`pytesseract` or `pdf2image` package not found, " | |
"please run `pip install pytesseract pdf2image`" | |
) | |
response = self.confluence.request(path=link, absolute=True) | |
text = "" | |
if ( | |
response.status_code != 200 | |
or response.content == b"" | |
or response.content is None | |
): | |
return text | |
try: | |
images = convert_from_bytes(response.content) | |
except ValueError: | |
return text | |
for i, image in enumerate(images): | |
image_text = pytesseract.image_to_string(image, lang=ocr_languages) | |
text += f"Page {i + 1}:\n{image_text}\n\n" | |
return text | |
def process_image( | |
self, | |
link: str, | |
ocr_languages: Optional[str] = None, | |
) -> str: | |
try: | |
import pytesseract # noqa: F401 | |
from PIL import Image # noqa: F401 | |
except ImportError: | |
raise ImportError( | |
"`pytesseract` or `Pillow` package not found, " | |
"please run `pip install pytesseract Pillow`" | |
) | |
response = self.confluence.request(path=link, absolute=True) | |
text = "" | |
if ( | |
response.status_code != 200 | |
or response.content == b"" | |
or response.content is None | |
): | |
return text | |
try: | |
image = Image.open(BytesIO(response.content)) | |
except OSError: | |
return text | |
return pytesseract.image_to_string(image, lang=ocr_languages) | |
def process_doc(self, link: str) -> str: | |
try: | |
import docx2txt # noqa: F401 | |
except ImportError: | |
raise ImportError( | |
"`docx2txt` package not found, please run `pip install docx2txt`" | |
) | |
response = self.confluence.request(path=link, absolute=True) | |
text = "" | |
if ( | |
response.status_code != 200 | |
or response.content == b"" | |
or response.content is None | |
): | |
return text | |
file_data = BytesIO(response.content) | |
return docx2txt.process(file_data) | |
def process_xls(self, link: str) -> str: | |
import io | |
import os | |
try: | |
import xlrd # noqa: F401 | |
except ImportError: | |
raise ImportError("`xlrd` package not found, please run `pip install xlrd`") | |
try: | |
import pandas as pd | |
except ImportError: | |
raise ImportError( | |
"`pandas` package not found, please run `pip install pandas`" | |
) | |
response = self.confluence.request(path=link, absolute=True) | |
text = "" | |
if ( | |
response.status_code != 200 | |
or response.content == b"" | |
or response.content is None | |
): | |
return text | |
filename = os.path.basename(link) | |
# Getting the whole content of the url after filename, | |
# Example: ".csv?version=2&modificationDate=1631800010678&cacheVersion=1&api=v2" | |
file_extension = os.path.splitext(filename)[1] | |
if file_extension.startswith( | |
".csv" | |
): # if the extension found in the url is ".csv" | |
content_string = response.content.decode("utf-8") | |
df = pd.read_csv(io.StringIO(content_string)) | |
text += df.to_string(index=False, header=False) + "\n\n" | |
else: | |
workbook = xlrd.open_workbook(file_contents=response.content) | |
for sheet in workbook.sheets(): | |
text += f"{sheet.name}:\n" | |
for row in range(sheet.nrows): | |
for col in range(sheet.ncols): | |
text += f"{sheet.cell_value(row, col)}\t" | |
text += "\n" | |
text += "\n" | |
return text | |
def process_svg( | |
self, | |
link: str, | |
ocr_languages: Optional[str] = None, | |
) -> str: | |
try: | |
import pytesseract # noqa: F401 | |
from PIL import Image # noqa: F401 | |
from reportlab.graphics import renderPM # noqa: F401 | |
from svglib.svglib import svg2rlg # noqa: F401 | |
except ImportError: | |
raise ImportError( | |
"`pytesseract`, `Pillow`, `reportlab` or `svglib` package not found, " | |
"please run `pip install pytesseract Pillow reportlab svglib`" | |
) | |
response = self.confluence.request(path=link, absolute=True) | |
text = "" | |
if ( | |
response.status_code != 200 | |
or response.content == b"" | |
or response.content is None | |
): | |
return text | |
drawing = svg2rlg(BytesIO(response.content)) | |
img_data = BytesIO() | |
renderPM.drawToFile(drawing, img_data, fmt="PNG") | |
img_data.seek(0) | |
image = Image.open(img_data) | |
return pytesseract.image_to_string(image, lang=ocr_languages) | |