Spaces:
Runtime error
Runtime error
import json | |
import logging | |
import time | |
from typing import List | |
import requests | |
from langchain.docstore.document import Document | |
from langchain.document_loaders.base import BaseLoader | |
logger = logging.getLogger(__name__) | |
class CubeSemanticLoader(BaseLoader): | |
"""Load `Cube semantic layer` metadata. | |
Args: | |
cube_api_url: REST API endpoint. | |
Use the REST API of your Cube's deployment. | |
Please find out more information here: | |
https://cube.dev/docs/http-api/rest#configuration-base-path | |
cube_api_token: Cube API token. | |
Authentication tokens are generated based on your Cube's API secret. | |
Please find out more information here: | |
https://cube.dev/docs/security#generating-json-web-tokens-jwt | |
load_dimension_values: Whether to load dimension values for every string | |
dimension or not. | |
dimension_values_limit: Maximum number of dimension values to load. | |
dimension_values_max_retries: Maximum number of retries to load dimension | |
values. | |
dimension_values_retry_delay: Delay between retries to load dimension values. | |
""" | |
def __init__( | |
self, | |
cube_api_url: str, | |
cube_api_token: str, | |
load_dimension_values: bool = True, | |
dimension_values_limit: int = 10_000, | |
dimension_values_max_retries: int = 10, | |
dimension_values_retry_delay: int = 3, | |
): | |
self.cube_api_url = cube_api_url | |
self.cube_api_token = cube_api_token | |
self.load_dimension_values = load_dimension_values | |
self.dimension_values_limit = dimension_values_limit | |
self.dimension_values_max_retries = dimension_values_max_retries | |
self.dimension_values_retry_delay = dimension_values_retry_delay | |
def _get_dimension_values(self, dimension_name: str) -> List[str]: | |
"""Makes a call to Cube's REST API load endpoint to retrieve | |
values for dimensions. | |
These values can be used to achieve a more accurate filtering. | |
""" | |
logger.info("Loading dimension values for: {dimension_name}...") | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": self.cube_api_token, | |
} | |
query = { | |
"query": { | |
"dimensions": [dimension_name], | |
"limit": self.dimension_values_limit, | |
} | |
} | |
retries = 0 | |
while retries < self.dimension_values_max_retries: | |
response = requests.request( | |
"POST", | |
f"{self.cube_api_url}/load", | |
headers=headers, | |
data=json.dumps(query), | |
) | |
if response.status_code == 200: | |
response_data = response.json() | |
if ( | |
"error" in response_data | |
and response_data["error"] == "Continue wait" | |
): | |
logger.info("Retrying...") | |
retries += 1 | |
time.sleep(self.dimension_values_retry_delay) | |
continue | |
else: | |
dimension_values = [ | |
item[dimension_name] for item in response_data["data"] | |
] | |
return dimension_values | |
else: | |
logger.error("Request failed with status code:", response.status_code) | |
break | |
if retries == self.dimension_values_max_retries: | |
logger.info("Maximum retries reached.") | |
return [] | |
def load(self) -> List[Document]: | |
"""Makes a call to Cube's REST API metadata endpoint. | |
Returns: | |
A list of documents with attributes: | |
- page_content=column_title + column_description | |
- metadata | |
- table_name | |
- column_name | |
- column_data_type | |
- column_member_type | |
- column_title | |
- column_description | |
- column_values | |
- cube_data_obj_type | |
""" | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": self.cube_api_token, | |
} | |
logger.info(f"Loading metadata from {self.cube_api_url}...") | |
response = requests.get(f"{self.cube_api_url}/meta", headers=headers) | |
response.raise_for_status() | |
raw_meta_json = response.json() | |
cube_data_objects = raw_meta_json.get("cubes", []) | |
logger.info(f"Found {len(cube_data_objects)} cube data objects in metadata.") | |
if not cube_data_objects: | |
raise ValueError("No cubes found in metadata.") | |
docs = [] | |
for cube_data_obj in cube_data_objects: | |
cube_data_obj_name = cube_data_obj.get("name") | |
cube_data_obj_type = cube_data_obj.get("type") | |
cube_data_obj_is_public = cube_data_obj.get("public") | |
measures = cube_data_obj.get("measures", []) | |
dimensions = cube_data_obj.get("dimensions", []) | |
logger.info(f"Processing {cube_data_obj_name}...") | |
if not cube_data_obj_is_public: | |
logger.info(f"Skipping {cube_data_obj_name} because it is not public.") | |
continue | |
for item in measures + dimensions: | |
column_member_type = "measure" if item in measures else "dimension" | |
dimension_values = [] | |
item_name = str(item.get("name")) | |
item_type = str(item.get("type")) | |
if ( | |
self.load_dimension_values | |
and column_member_type == "dimension" | |
and item_type == "string" | |
): | |
dimension_values = self._get_dimension_values(item_name) | |
metadata = dict( | |
table_name=str(cube_data_obj_name), | |
column_name=item_name, | |
column_data_type=item_type, | |
column_title=str(item.get("title")), | |
column_description=str(item.get("description")), | |
column_member_type=column_member_type, | |
column_values=dimension_values, | |
cube_data_obj_type=cube_data_obj_type, | |
) | |
page_content = f"{str(item.get('title'))}, " | |
page_content += f"{str(item.get('description'))}" | |
docs.append(Document(page_content=page_content, metadata=metadata)) | |
return docs | |