| | import ast |
| | import contextlib |
| | import re |
| | import traceback |
| | from typing import Any |
| | from uuid import UUID |
| |
|
| | from fastapi import HTTPException |
| | from loguru import logger |
| | from pydantic import BaseModel |
| |
|
| | from langflow.custom import CustomComponent |
| | from langflow.custom.custom_component.component import Component |
| | from langflow.custom.directory_reader.utils import ( |
| | abuild_custom_component_list_from_path, |
| | build_custom_component_list_from_path, |
| | merge_nested_dicts_with_renaming, |
| | ) |
| | from langflow.custom.eval import eval_custom_component_code |
| | from langflow.custom.schema import MissingDefault |
| | from langflow.field_typing.range_spec import RangeSpec |
| | from langflow.helpers.custom import format_type |
| | from langflow.schema import dotdict |
| | from langflow.template.field.base import Input |
| | from langflow.template.frontend_node.custom_components import ComponentFrontendNode, CustomComponentFrontendNode |
| | from langflow.type_extraction.type_extraction import extract_inner_type |
| | from langflow.utils import validate |
| | from langflow.utils.util import get_base_classes |
| |
|
| |
|
| | class UpdateBuildConfigError(Exception): |
| | pass |
| |
|
| |
|
| | def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: list[str]) -> None: |
| | """Add output types to the frontend node.""" |
| | for return_type in return_types: |
| | if return_type is None: |
| | raise HTTPException( |
| | status_code=400, |
| | detail={ |
| | "error": ("Invalid return type. Please check your code and try again."), |
| | "traceback": traceback.format_exc(), |
| | }, |
| | ) |
| | if return_type is str: |
| | return_type_ = "Text" |
| | elif hasattr(return_type, "__name__"): |
| | return_type_ = return_type.__name__ |
| | elif hasattr(return_type, "__class__"): |
| | return_type_ = return_type.__class__.__name__ |
| | else: |
| | return_type_ = str(return_type) |
| |
|
| | frontend_node.add_output_type(return_type_) |
| |
|
| |
|
| | def reorder_fields(frontend_node: CustomComponentFrontendNode, field_order: list[str]) -> None: |
| | """Reorder fields in the frontend node based on the specified field_order.""" |
| | if not field_order: |
| | return |
| |
|
| | |
| | field_dict = {field.name: field for field in frontend_node.template.fields} |
| | reordered_fields = [field_dict[name] for name in field_order if name in field_dict] |
| | |
| | reordered_fields.extend(field for field in frontend_node.template.fields if field.name not in field_order) |
| | frontend_node.template.fields = reordered_fields |
| | frontend_node.field_order = field_order |
| |
|
| |
|
| | def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: list[str]) -> None: |
| | """Add base classes to the frontend node.""" |
| | for return_type_instance in return_types: |
| | if return_type_instance is None: |
| | raise HTTPException( |
| | status_code=400, |
| | detail={ |
| | "error": ("Invalid return type. Please check your code and try again."), |
| | "traceback": traceback.format_exc(), |
| | }, |
| | ) |
| |
|
| | base_classes = get_base_classes(return_type_instance) |
| | if return_type_instance is str: |
| | base_classes.append("Text") |
| |
|
| | for base_class in base_classes: |
| | frontend_node.add_base_class(base_class) |
| |
|
| |
|
| | def extract_type_from_optional(field_type): |
| | """Extract the type from a string formatted as "Optional[<type>]". |
| | |
| | Parameters: |
| | field_type (str): The string from which to extract the type. |
| | |
| | Returns: |
| | str: The extracted type, or an empty string if no type was found. |
| | """ |
| | if "optional" not in field_type.lower(): |
| | return field_type |
| | match = re.search(r"\[(.*?)\]$", field_type) |
| | return match[1] if match else field_type |
| |
|
| |
|
| | def get_field_properties(extra_field): |
| | """Get the properties of an extra field.""" |
| | field_name = extra_field["name"] |
| | field_type = extra_field.get("type", "str") |
| | field_value = extra_field.get("default", "") |
| | |
| | |
| | |
| | field_required = "optional" not in field_type.lower() and isinstance(field_value, MissingDefault) |
| | field_value = field_value if not isinstance(field_value, MissingDefault) else None |
| |
|
| | if not field_required: |
| | field_type = extract_type_from_optional(field_type) |
| | if field_value is not None: |
| | with contextlib.suppress(Exception): |
| | field_value = ast.literal_eval(field_value) |
| | return field_name, field_type, field_value, field_required |
| |
|
| |
|
| | def process_type(field_type: str): |
| | if field_type.startswith(("list", "List")): |
| | return extract_inner_type(field_type) |
| |
|
| | |
| | |
| | lowercase_type = field_type.lower() |
| | if lowercase_type in {"prompt", "code"}: |
| | return lowercase_type |
| | return field_type |
| |
|
| |
|
| | def add_new_custom_field( |
| | *, |
| | frontend_node: CustomComponentFrontendNode, |
| | field_name: str, |
| | field_type: str, |
| | field_value: Any, |
| | field_required: bool, |
| | field_config: dict, |
| | ): |
| | |
| | |
| | display_name = field_config.pop("display_name", None) |
| | if not field_type: |
| | if "type" in field_config and field_config["type"] is not None: |
| | field_type = field_config.pop("type") |
| | elif "field_type" in field_config and field_config["field_type"] is not None: |
| | field_type = field_config.pop("field_type") |
| | field_contains_list = "list" in field_type.lower() |
| | field_type = process_type(field_type) |
| | field_value = field_config.pop("value", field_value) |
| | field_advanced = field_config.pop("advanced", False) |
| |
|
| | if field_type == "Dict": |
| | field_type = "dict" |
| |
|
| | if field_type == "bool" and field_value is None: |
| | field_value = False |
| |
|
| | if field_type == "SecretStr": |
| | field_config["password"] = True |
| | field_config["load_from_db"] = True |
| | field_config["input_types"] = ["Text"] |
| |
|
| | |
| | |
| | is_list = isinstance(field_config.get("options"), list) |
| | field_config["is_list"] = is_list or field_config.get("list", False) or field_contains_list |
| |
|
| | if "name" in field_config: |
| | logger.warning("The 'name' key in field_config is used to build the object and can't be changed.") |
| | required = field_config.pop("required", field_required) |
| | placeholder = field_config.pop("placeholder", "") |
| |
|
| | new_field = Input( |
| | name=field_name, |
| | field_type=field_type, |
| | value=field_value, |
| | show=True, |
| | required=required, |
| | advanced=field_advanced, |
| | placeholder=placeholder, |
| | display_name=display_name, |
| | **sanitize_field_config(field_config), |
| | ) |
| | frontend_node.template.upsert_field(field_name, new_field) |
| | if isinstance(frontend_node.custom_fields, dict): |
| | frontend_node.custom_fields[field_name] = None |
| |
|
| | return frontend_node |
| |
|
| |
|
| | def add_extra_fields(frontend_node, field_config, function_args) -> None: |
| | """Add extra fields to the frontend node.""" |
| | if not function_args: |
| | return |
| | field_config_ = field_config.copy() |
| | function_args_names = [arg["name"] for arg in function_args] |
| | |
| | |
| |
|
| | for extra_field in function_args: |
| | if "name" not in extra_field or extra_field["name"] in { |
| | "self", |
| | "kwargs", |
| | "args", |
| | }: |
| | continue |
| |
|
| | field_name, field_type, field_value, field_required = get_field_properties(extra_field) |
| | config = field_config_.pop(field_name, {}) |
| | frontend_node = add_new_custom_field( |
| | frontend_node=frontend_node, |
| | field_name=field_name, |
| | field_type=field_type, |
| | field_value=field_value, |
| | field_required=field_required, |
| | field_config=config, |
| | ) |
| | if "kwargs" in function_args_names and not all(key in function_args_names for key in field_config): |
| | for field_name, config in field_config_.items(): |
| | if "name" not in config or field_name == "code": |
| | continue |
| | config_ = config.model_dump() if isinstance(config, BaseModel) else config |
| | field_name_, field_type, field_value, field_required = get_field_properties(extra_field=config_) |
| | frontend_node = add_new_custom_field( |
| | frontend_node=frontend_node, |
| | field_name=field_name_, |
| | field_type=field_type, |
| | field_value=field_value, |
| | field_required=field_required, |
| | field_config=config_, |
| | ) |
| |
|
| |
|
| | def get_field_dict(field: Input | dict): |
| | """Get the field dictionary from a Input or a dict.""" |
| | if isinstance(field, Input): |
| | return dotdict(field.model_dump(by_alias=True, exclude_none=True)) |
| | return field |
| |
|
| |
|
| | def run_build_inputs( |
| | custom_component: Component, |
| | ): |
| | """Run the build inputs of a custom component.""" |
| | try: |
| | return custom_component.build_inputs() |
| | |
| | except Exception as exc: |
| | logger.exception("Error running build inputs") |
| | raise HTTPException(status_code=500, detail=str(exc)) from exc |
| |
|
| |
|
| | def get_component_instance(custom_component: CustomComponent, user_id: str | UUID | None = None): |
| | if custom_component._code is None: |
| | error = "Code is None" |
| | elif not isinstance(custom_component._code, str): |
| | error = "Invalid code type" |
| | else: |
| | try: |
| | custom_class = eval_custom_component_code(custom_component._code) |
| | except Exception as exc: |
| | logger.exception("Error while evaluating custom component code") |
| | raise HTTPException( |
| | status_code=400, |
| | detail={ |
| | "error": ("Invalid type conversion. Please check your code and try again."), |
| | "traceback": traceback.format_exc(), |
| | }, |
| | ) from exc |
| |
|
| | try: |
| | return custom_class(_user_id=user_id, _code=custom_component._code) |
| | except Exception as exc: |
| | logger.exception("Error while instantiating custom component") |
| | if hasattr(exc, "detail") and "traceback" in exc.detail: |
| | logger.error(exc.detail["traceback"]) |
| |
|
| | raise |
| |
|
| | msg = f"Invalid type conversion: {error}. Please check your code and try again." |
| | logger.error(msg) |
| | raise HTTPException( |
| | status_code=400, |
| | detail={"error": msg}, |
| | ) |
| |
|
| |
|
| | def run_build_config( |
| | custom_component: CustomComponent, |
| | user_id: str | UUID | None = None, |
| | ) -> tuple[dict, CustomComponent]: |
| | """Build the field configuration for a custom component.""" |
| | if custom_component._code is None: |
| | error = "Code is None" |
| | elif not isinstance(custom_component._code, str): |
| | error = "Invalid code type" |
| | else: |
| | try: |
| | custom_class = eval_custom_component_code(custom_component._code) |
| | except Exception as exc: |
| | logger.exception("Error while evaluating custom component code") |
| | raise HTTPException( |
| | status_code=400, |
| | detail={ |
| | "error": ("Invalid type conversion. Please check your code and try again."), |
| | "traceback": traceback.format_exc(), |
| | }, |
| | ) from exc |
| |
|
| | try: |
| | custom_instance = custom_class(_user_id=user_id) |
| | build_config: dict = custom_instance.build_config() |
| |
|
| | for field_name, field in build_config.copy().items(): |
| | |
| | |
| | field_dict = get_field_dict(field) |
| | |
| | if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec): |
| | field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump() |
| | build_config[field_name] = field_dict |
| |
|
| | except Exception as exc: |
| | logger.exception("Error while building field config") |
| | if hasattr(exc, "detail") and "traceback" in exc.detail: |
| | logger.error(exc.detail["traceback"]) |
| | raise |
| | return build_config, custom_instance |
| |
|
| | msg = f"Invalid type conversion: {error}. Please check your code and try again." |
| | logger.error(msg) |
| | raise HTTPException( |
| | status_code=400, |
| | detail={"error": msg}, |
| | ) |
| |
|
| |
|
| | def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code): |
| | code_field = Input( |
| | dynamic=True, |
| | required=True, |
| | placeholder="", |
| | multiline=True, |
| | value=raw_code, |
| | password=False, |
| | name="code", |
| | advanced=True, |
| | field_type="code", |
| | is_list=False, |
| | ) |
| | frontend_node.template.add_field(code_field) |
| |
|
| | return frontend_node |
| |
|
| |
|
| | def build_custom_component_template_from_inputs( |
| | custom_component: Component | CustomComponent, user_id: str | UUID | None = None |
| | ): |
| | |
| | cc_instance = get_component_instance(custom_component, user_id=user_id) |
| | field_config = cc_instance.get_template_config(cc_instance) |
| | frontend_node = ComponentFrontendNode.from_inputs(**field_config) |
| | frontend_node = add_code_field(frontend_node, custom_component._code) |
| | |
| | for output in frontend_node.outputs: |
| | if output.types: |
| | continue |
| | return_types = cc_instance.get_method_return_type(output.method) |
| | return_types = [format_type(return_type) for return_type in return_types] |
| | output.add_types(return_types) |
| | output.set_selected() |
| | |
| | frontend_node.validate_component() |
| | |
| | frontend_node.set_base_classes_from_outputs() |
| | reorder_fields(frontend_node, cc_instance._get_field_order()) |
| |
|
| | return frontend_node.to_dict(keep_name=False), cc_instance |
| |
|
| |
|
| | def build_custom_component_template( |
| | custom_component: CustomComponent, |
| | user_id: str | UUID | None = None, |
| | ) -> tuple[dict[str, Any], CustomComponent | Component]: |
| | """Build a custom component template.""" |
| | try: |
| | has_template_config = hasattr(custom_component, "template_config") |
| | except Exception as exc: |
| | raise HTTPException( |
| | status_code=400, |
| | detail={ |
| | "error": (f"Error building Component: {exc}"), |
| | "traceback": traceback.format_exc(), |
| | }, |
| | ) from exc |
| | if not has_template_config: |
| | raise HTTPException( |
| | status_code=400, |
| | detail={ |
| | "error": ("Error building Component. Please check if you are importing Component correctly."), |
| | }, |
| | ) |
| | try: |
| | if "inputs" in custom_component.template_config: |
| | return build_custom_component_template_from_inputs(custom_component, user_id=user_id) |
| | frontend_node = CustomComponentFrontendNode(**custom_component.template_config) |
| |
|
| | field_config, custom_instance = run_build_config( |
| | custom_component, |
| | user_id=user_id, |
| | ) |
| |
|
| | entrypoint_args = custom_component.get_function_entrypoint_args |
| |
|
| | add_extra_fields(frontend_node, field_config, entrypoint_args) |
| |
|
| | frontend_node = add_code_field(frontend_node, custom_component._code) |
| |
|
| | add_base_classes(frontend_node, custom_component._get_function_entrypoint_return_type) |
| | add_output_types(frontend_node, custom_component._get_function_entrypoint_return_type) |
| |
|
| | reorder_fields(frontend_node, custom_instance._get_field_order()) |
| |
|
| | return frontend_node.to_dict(keep_name=False), custom_instance |
| | except Exception as exc: |
| | if isinstance(exc, HTTPException): |
| | raise |
| | raise HTTPException( |
| | status_code=400, |
| | detail={ |
| | "error": (f"Error building Component: {exc}"), |
| | "traceback": traceback.format_exc(), |
| | }, |
| | ) from exc |
| |
|
| |
|
| | def create_component_template(component): |
| | """Create a template for a component.""" |
| | component_code = component["code"] |
| | component_output_types = component["output_types"] |
| |
|
| | component_extractor = Component(_code=component_code) |
| |
|
| | component_template, component_instance = build_custom_component_template(component_extractor) |
| | if not component_template["output_types"] and component_output_types: |
| | component_template["output_types"] = component_output_types |
| |
|
| | return component_template, component_instance |
| |
|
| |
|
| | def build_custom_components(components_paths: list[str]): |
| | """Build custom components from the specified paths.""" |
| | if not components_paths: |
| | return {} |
| |
|
| | logger.info(f"Building custom components from {components_paths}") |
| | custom_components_from_file: dict = {} |
| | processed_paths = set() |
| | for path in components_paths: |
| | path_str = str(path) |
| | if path_str in processed_paths: |
| | continue |
| |
|
| | custom_component_dict = build_custom_component_list_from_path(path_str) |
| | if custom_component_dict: |
| | category = next(iter(custom_component_dict)) |
| | logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") |
| | custom_components_from_file = merge_nested_dicts_with_renaming( |
| | custom_components_from_file, custom_component_dict |
| | ) |
| | processed_paths.add(path_str) |
| |
|
| | return custom_components_from_file |
| |
|
| |
|
| | async def abuild_custom_components(components_paths: list[str]): |
| | """Build custom components from the specified paths.""" |
| | if not components_paths: |
| | return {} |
| |
|
| | logger.info(f"Building custom components from {components_paths}") |
| | custom_components_from_file: dict = {} |
| | processed_paths = set() |
| | for path in components_paths: |
| | path_str = str(path) |
| | if path_str in processed_paths: |
| | continue |
| |
|
| | custom_component_dict = await abuild_custom_component_list_from_path(path_str) |
| | if custom_component_dict: |
| | category = next(iter(custom_component_dict)) |
| | logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") |
| | custom_components_from_file = merge_nested_dicts_with_renaming( |
| | custom_components_from_file, custom_component_dict |
| | ) |
| | processed_paths.add(path_str) |
| |
|
| | return custom_components_from_file |
| |
|
| |
|
| | def update_field_dict( |
| | custom_component_instance: "CustomComponent", |
| | field_dict: dict, |
| | build_config: dict, |
| | *, |
| | update_field: str | None = None, |
| | update_field_value: Any | None = None, |
| | call: bool = False, |
| | ): |
| | """Update the field dictionary by calling options() or value() if they are callable.""" |
| | if ( |
| | ("real_time_refresh" in field_dict or "refresh_button" in field_dict) |
| | and any( |
| | ( |
| | field_dict.get("real_time_refresh", False), |
| | field_dict.get("refresh_button", False), |
| | ) |
| | ) |
| | and call |
| | ): |
| | try: |
| | dd_build_config = dotdict(build_config) |
| | custom_component_instance.update_build_config( |
| | build_config=dd_build_config, |
| | field_value=update_field, |
| | field_name=update_field_value, |
| | ) |
| | build_config = dd_build_config |
| | except Exception as exc: |
| | msg = f"Error while running update_build_config: {exc}" |
| | logger.exception(msg) |
| | raise UpdateBuildConfigError(msg) from exc |
| |
|
| | return build_config |
| |
|
| |
|
| | def sanitize_field_config(field_config: dict | Input): |
| | |
| | field_dict = field_config.to_dict() if isinstance(field_config, Input) else field_config |
| | for key in [ |
| | "name", |
| | "field_type", |
| | "value", |
| | "required", |
| | "placeholder", |
| | "display_name", |
| | "advanced", |
| | "show", |
| | ]: |
| | field_dict.pop(key, None) |
| |
|
| | |
| | field_dict.pop("field_type", None) |
| | field_dict.pop("type", None) |
| |
|
| | return field_dict |
| |
|
| |
|
| | def build_component(component): |
| | """Build a single component.""" |
| | component_template, component_instance = create_component_template(component) |
| | component_name = get_instance_name(component_instance) |
| | return component_name, component_template |
| |
|
| |
|
| | def get_function(code): |
| | """Get the function.""" |
| | function_name = validate.extract_function_name(code) |
| |
|
| | return validate.create_function(code, function_name) |
| |
|
| |
|
| | def get_instance_name(instance): |
| | name = instance.__class__.__name__ |
| | if hasattr(instance, "name") and instance.name: |
| | name = instance.name |
| | return name |
| |
|