Spaces:
Build error
Build error
import json | |
import logging | |
from typing import Optional, Union | |
from configs import dify_config | |
from core.tools.entities.api_entities import UserTool, UserToolProvider | |
from core.tools.entities.common_entities import I18nObject | |
from core.tools.entities.tool_bundle import ApiToolBundle | |
from core.tools.entities.tool_entities import ( | |
ApiProviderAuthType, | |
ToolParameter, | |
ToolProviderCredentials, | |
ToolProviderType, | |
) | |
from core.tools.provider.api_tool_provider import ApiToolProviderController | |
from core.tools.provider.builtin_tool_provider import BuiltinToolProviderController | |
from core.tools.provider.workflow_tool_provider import WorkflowToolProviderController | |
from core.tools.tool.tool import Tool | |
from core.tools.tool.workflow_tool import WorkflowTool | |
from core.tools.utils.configuration import ToolConfigurationManager | |
from models.tools import ApiToolProvider, BuiltinToolProvider, WorkflowToolProvider | |
logger = logging.getLogger(__name__) | |
class ToolTransformService: | |
def get_tool_provider_icon_url(provider_type: str, provider_name: str, icon: str) -> Union[str, dict]: | |
""" | |
get tool provider icon url | |
""" | |
url_prefix = dify_config.CONSOLE_API_URL + "/console/api/workspaces/current/tool-provider/" | |
if provider_type == ToolProviderType.BUILT_IN.value: | |
return url_prefix + "builtin/" + provider_name + "/icon" | |
elif provider_type in {ToolProviderType.API.value, ToolProviderType.WORKFLOW.value}: | |
try: | |
return json.loads(icon) | |
except: | |
return {"background": "#252525", "content": "\ud83d\ude01"} | |
return "" | |
def repack_provider(provider: Union[dict, UserToolProvider]): | |
""" | |
repack provider | |
:param provider: the provider dict | |
""" | |
if isinstance(provider, dict) and "icon" in provider: | |
provider["icon"] = ToolTransformService.get_tool_provider_icon_url( | |
provider_type=provider["type"], provider_name=provider["name"], icon=provider["icon"] | |
) | |
elif isinstance(provider, UserToolProvider): | |
provider.icon = ToolTransformService.get_tool_provider_icon_url( | |
provider_type=provider.type.value, provider_name=provider.name, icon=provider.icon | |
) | |
def builtin_provider_to_user_provider( | |
provider_controller: BuiltinToolProviderController, | |
db_provider: Optional[BuiltinToolProvider], | |
decrypt_credentials: bool = True, | |
) -> UserToolProvider: | |
""" | |
convert provider controller to user provider | |
""" | |
result = UserToolProvider( | |
id=provider_controller.identity.name, | |
author=provider_controller.identity.author, | |
name=provider_controller.identity.name, | |
description=I18nObject( | |
en_US=provider_controller.identity.description.en_US, | |
zh_Hans=provider_controller.identity.description.zh_Hans, | |
pt_BR=provider_controller.identity.description.pt_BR, | |
ja_JP=provider_controller.identity.description.ja_JP, | |
), | |
icon=provider_controller.identity.icon, | |
label=I18nObject( | |
en_US=provider_controller.identity.label.en_US, | |
zh_Hans=provider_controller.identity.label.zh_Hans, | |
pt_BR=provider_controller.identity.label.pt_BR, | |
ja_JP=provider_controller.identity.label.ja_JP, | |
), | |
type=ToolProviderType.BUILT_IN, | |
masked_credentials={}, | |
is_team_authorization=False, | |
tools=[], | |
labels=provider_controller.tool_labels, | |
) | |
# get credentials schema | |
schema = provider_controller.get_credentials_schema() | |
for name, value in schema.items(): | |
result.masked_credentials[name] = ToolProviderCredentials.CredentialsType.default(value.type) | |
# check if the provider need credentials | |
if not provider_controller.need_credentials: | |
result.is_team_authorization = True | |
result.allow_delete = False | |
elif db_provider: | |
result.is_team_authorization = True | |
if decrypt_credentials: | |
credentials = db_provider.credentials | |
# init tool configuration | |
tool_configuration = ToolConfigurationManager( | |
tenant_id=db_provider.tenant_id, provider_controller=provider_controller | |
) | |
# decrypt the credentials and mask the credentials | |
decrypted_credentials = tool_configuration.decrypt_tool_credentials(credentials=credentials) | |
masked_credentials = tool_configuration.mask_tool_credentials(credentials=decrypted_credentials) | |
result.masked_credentials = masked_credentials | |
result.original_credentials = decrypted_credentials | |
return result | |
def api_provider_to_controller( | |
db_provider: ApiToolProvider, | |
) -> ApiToolProviderController: | |
""" | |
convert provider controller to user provider | |
""" | |
# package tool provider controller | |
controller = ApiToolProviderController.from_db( | |
db_provider=db_provider, | |
auth_type=ApiProviderAuthType.API_KEY | |
if db_provider.credentials["auth_type"] == "api_key" | |
else ApiProviderAuthType.NONE, | |
) | |
return controller | |
def workflow_provider_to_controller(db_provider: WorkflowToolProvider) -> WorkflowToolProviderController: | |
""" | |
convert provider controller to provider | |
""" | |
return WorkflowToolProviderController.from_db(db_provider) | |
def workflow_provider_to_user_provider( | |
provider_controller: WorkflowToolProviderController, labels: Optional[list[str]] = None | |
): | |
""" | |
convert provider controller to user provider | |
""" | |
return UserToolProvider( | |
id=provider_controller.provider_id, | |
author=provider_controller.identity.author, | |
name=provider_controller.identity.name, | |
description=I18nObject( | |
en_US=provider_controller.identity.description.en_US, | |
zh_Hans=provider_controller.identity.description.zh_Hans, | |
), | |
icon=provider_controller.identity.icon, | |
label=I18nObject( | |
en_US=provider_controller.identity.label.en_US, | |
zh_Hans=provider_controller.identity.label.zh_Hans, | |
), | |
type=ToolProviderType.WORKFLOW, | |
masked_credentials={}, | |
is_team_authorization=True, | |
tools=[], | |
labels=labels or [], | |
) | |
def api_provider_to_user_provider( | |
provider_controller: ApiToolProviderController, | |
db_provider: ApiToolProvider, | |
decrypt_credentials: bool = True, | |
labels: Optional[list[str]] = None, | |
) -> UserToolProvider: | |
""" | |
convert provider controller to user provider | |
""" | |
username = "Anonymous" | |
try: | |
username = db_provider.user.name | |
except Exception as e: | |
logger.error(f"failed to get user name for api provider {db_provider.id}: {str(e)}") | |
# add provider into providers | |
credentials = db_provider.credentials | |
result = UserToolProvider( | |
id=db_provider.id, | |
author=username, | |
name=db_provider.name, | |
description=I18nObject( | |
en_US=db_provider.description, | |
zh_Hans=db_provider.description, | |
), | |
icon=db_provider.icon, | |
label=I18nObject( | |
en_US=db_provider.name, | |
zh_Hans=db_provider.name, | |
), | |
type=ToolProviderType.API, | |
masked_credentials={}, | |
is_team_authorization=True, | |
tools=[], | |
labels=labels or [], | |
) | |
if decrypt_credentials: | |
# init tool configuration | |
tool_configuration = ToolConfigurationManager( | |
tenant_id=db_provider.tenant_id, provider_controller=provider_controller | |
) | |
# decrypt the credentials and mask the credentials | |
decrypted_credentials = tool_configuration.decrypt_tool_credentials(credentials=credentials) | |
masked_credentials = tool_configuration.mask_tool_credentials(credentials=decrypted_credentials) | |
result.masked_credentials = masked_credentials | |
return result | |
def tool_to_user_tool( | |
tool: Union[ApiToolBundle, WorkflowTool, Tool], | |
credentials: Optional[dict] = None, | |
tenant_id: Optional[str] = None, | |
labels: Optional[list[str]] = None, | |
) -> UserTool: | |
""" | |
convert tool to user tool | |
""" | |
if isinstance(tool, Tool): | |
# fork tool runtime | |
tool = tool.fork_tool_runtime( | |
runtime={ | |
"credentials": credentials, | |
"tenant_id": tenant_id, | |
} | |
) | |
# get tool parameters | |
parameters = tool.parameters or [] | |
# get tool runtime parameters | |
runtime_parameters = tool.get_runtime_parameters() or [] | |
# override parameters | |
current_parameters = parameters.copy() | |
for runtime_parameter in runtime_parameters: | |
found = False | |
for index, parameter in enumerate(current_parameters): | |
if parameter.name == runtime_parameter.name and parameter.form == runtime_parameter.form: | |
current_parameters[index] = runtime_parameter | |
found = True | |
break | |
if not found and runtime_parameter.form == ToolParameter.ToolParameterForm.FORM: | |
current_parameters.append(runtime_parameter) | |
return UserTool( | |
author=tool.identity.author, | |
name=tool.identity.name, | |
label=tool.identity.label, | |
description=tool.description.human, | |
parameters=current_parameters, | |
labels=labels, | |
) | |
if isinstance(tool, ApiToolBundle): | |
return UserTool( | |
author=tool.author, | |
name=tool.operation_id, | |
label=I18nObject(en_US=tool.operation_id, zh_Hans=tool.operation_id), | |
description=I18nObject(en_US=tool.summary or "", zh_Hans=tool.summary or ""), | |
parameters=tool.parameters, | |
labels=labels, | |
) | |