|
|
import copy
|
|
|
import glob
|
|
|
import inspect
|
|
|
import json
|
|
|
import os
|
|
|
import random
|
|
|
import sys
|
|
|
import re
|
|
|
from typing import Dict, List, Any, Callable, Tuple, TextIO
|
|
|
from argparse import ArgumentParser
|
|
|
|
|
|
import black
|
|
|
|
|
|
|
|
|
from comfyui_to_python_utils import (
|
|
|
import_custom_nodes,
|
|
|
find_path,
|
|
|
add_comfyui_directory_to_sys_path,
|
|
|
add_extra_model_paths,
|
|
|
get_value_at_index,
|
|
|
)
|
|
|
|
|
|
add_comfyui_directory_to_sys_path()
|
|
|
from nodes import NODE_CLASS_MAPPINGS
|
|
|
|
|
|
|
|
|
DEFAULT_INPUT_FILE = "workflow_api.json"
|
|
|
DEFAULT_OUTPUT_FILE = "workflow_api.py"
|
|
|
DEFAULT_QUEUE_SIZE = 10
|
|
|
|
|
|
|
|
|
class FileHandler:
|
|
|
"""Handles reading and writing files.
|
|
|
|
|
|
This class provides methods to read JSON data from an input file and write code to an output file.
|
|
|
"""
|
|
|
|
|
|
@staticmethod
|
|
|
def read_json_file(file_path: str | TextIO, encoding: str = "utf-8") -> dict:
|
|
|
"""
|
|
|
Reads a JSON file and returns its contents as a dictionary.
|
|
|
|
|
|
Args:
|
|
|
file_path (str): The path to the JSON file.
|
|
|
|
|
|
Returns:
|
|
|
dict: The contents of the JSON file as a dictionary.
|
|
|
|
|
|
Raises:
|
|
|
FileNotFoundError: If the file is not found, it lists all JSON files in the directory of the file path.
|
|
|
ValueError: If the file is not a valid JSON.
|
|
|
"""
|
|
|
|
|
|
if hasattr(file_path, "read"):
|
|
|
return json.load(file_path)
|
|
|
with open(file_path, "r", encoding="utf-8") as file:
|
|
|
data = json.load(file)
|
|
|
return data
|
|
|
|
|
|
@staticmethod
|
|
|
def write_code_to_file(file_path: str | TextIO, code: str) -> None:
|
|
|
"""Write the specified code to a Python file.
|
|
|
|
|
|
Args:
|
|
|
file_path (str): The path to the Python file.
|
|
|
code (str): The code to write to the file.
|
|
|
|
|
|
Returns:
|
|
|
None
|
|
|
"""
|
|
|
if isinstance(file_path, str):
|
|
|
|
|
|
directory = os.path.dirname(file_path)
|
|
|
|
|
|
|
|
|
if directory and not os.path.exists(directory):
|
|
|
os.makedirs(directory)
|
|
|
|
|
|
|
|
|
with open(file_path, "w", encoding="utf-8") as file:
|
|
|
file.write(code)
|
|
|
else:
|
|
|
file_path.write(code)
|
|
|
|
|
|
|
|
|
class LoadOrderDeterminer:
|
|
|
"""Determine the load order of each key in the provided dictionary.
|
|
|
|
|
|
This class places the nodes without node dependencies first, then ensures that any node whose
|
|
|
result is used in another node will be added to the list in the order it should be executed.
|
|
|
|
|
|
Attributes:
|
|
|
data (Dict): The dictionary for which to determine the load order.
|
|
|
node_class_mappings (Dict): Mappings of node classes.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, data: Dict, node_class_mappings: Dict):
|
|
|
"""Initialize the LoadOrderDeterminer with the given data and node class mappings.
|
|
|
|
|
|
Args:
|
|
|
data (Dict): The dictionary for which to determine the load order.
|
|
|
node_class_mappings (Dict): Mappings of node classes.
|
|
|
"""
|
|
|
self.data = data
|
|
|
self.node_class_mappings = node_class_mappings
|
|
|
self.visited = {}
|
|
|
self.load_order = []
|
|
|
self.is_special_function = False
|
|
|
|
|
|
def determine_load_order(self) -> List[Tuple[str, Dict, bool]]:
|
|
|
"""Determine the load order for the given data.
|
|
|
|
|
|
Returns:
|
|
|
List[Tuple[str, Dict, bool]]: A list of tuples representing the load order.
|
|
|
"""
|
|
|
self._load_special_functions_first()
|
|
|
self.is_special_function = False
|
|
|
for key in self.data:
|
|
|
if key not in self.visited:
|
|
|
self._dfs(key)
|
|
|
return self.load_order
|
|
|
|
|
|
def _dfs(self, key: str) -> None:
|
|
|
"""Depth-First Search function to determine the load order.
|
|
|
|
|
|
Args:
|
|
|
key (str): The key from which to start the DFS.
|
|
|
|
|
|
Returns:
|
|
|
None
|
|
|
"""
|
|
|
|
|
|
self.visited[key] = True
|
|
|
inputs = self.data[key]["inputs"]
|
|
|
|
|
|
for input_key, val in inputs.items():
|
|
|
|
|
|
|
|
|
if isinstance(val, list) and val[0] not in self.visited:
|
|
|
self._dfs(val[0])
|
|
|
|
|
|
self.load_order.append((key, self.data[key], self.is_special_function))
|
|
|
|
|
|
def _load_special_functions_first(self) -> None:
|
|
|
"""Load functions without dependencies, loaderes, and encoders first.
|
|
|
|
|
|
Returns:
|
|
|
None
|
|
|
"""
|
|
|
|
|
|
for key in self.data:
|
|
|
class_def = self.node_class_mappings[self.data[key]["class_type"]]()
|
|
|
|
|
|
if (
|
|
|
class_def.CATEGORY == "loaders"
|
|
|
or class_def.FUNCTION in ["encode"]
|
|
|
or not any(
|
|
|
isinstance(val, list) for val in self.data[key]["inputs"].values()
|
|
|
)
|
|
|
):
|
|
|
self.is_special_function = True
|
|
|
|
|
|
if key not in self.visited:
|
|
|
self._dfs(key)
|
|
|
|
|
|
|
|
|
class CodeGenerator:
|
|
|
"""Generates Python code for a workflow based on the load order.
|
|
|
|
|
|
Attributes:
|
|
|
node_class_mappings (Dict): Mappings of node classes.
|
|
|
base_node_class_mappings (Dict): Base mappings of node classes.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, node_class_mappings: Dict, base_node_class_mappings: Dict):
|
|
|
"""Initialize the CodeGenerator with given node class mappings.
|
|
|
|
|
|
Args:
|
|
|
node_class_mappings (Dict): Mappings of node classes.
|
|
|
base_node_class_mappings (Dict): Base mappings of node classes.
|
|
|
"""
|
|
|
self.node_class_mappings = node_class_mappings
|
|
|
self.base_node_class_mappings = base_node_class_mappings
|
|
|
|
|
|
def generate_workflow(
|
|
|
self,
|
|
|
load_order: List,
|
|
|
queue_size: int = 10,
|
|
|
) -> str:
|
|
|
"""Generate the execution code based on the load order.
|
|
|
|
|
|
Args:
|
|
|
load_order (List): A list of tuples representing the load order.
|
|
|
queue_size (int): The number of photos that will be created by the script.
|
|
|
|
|
|
Returns:
|
|
|
str: Generated execution code as a string.
|
|
|
"""
|
|
|
|
|
|
import_statements, executed_variables, special_functions_code, code = (
|
|
|
set(["NODE_CLASS_MAPPINGS"]),
|
|
|
{},
|
|
|
[],
|
|
|
[],
|
|
|
)
|
|
|
|
|
|
initialized_objects = {}
|
|
|
|
|
|
custom_nodes = False
|
|
|
|
|
|
for idx, data, is_special_function in load_order:
|
|
|
|
|
|
inputs, class_type = data["inputs"], data["class_type"]
|
|
|
input_types = self.node_class_mappings[class_type].INPUT_TYPES()
|
|
|
class_def = self.node_class_mappings[class_type]()
|
|
|
|
|
|
|
|
|
missing_required_variable = False
|
|
|
if "required" in input_types.keys():
|
|
|
for required in input_types["required"]:
|
|
|
if required not in inputs.keys():
|
|
|
missing_required_variable = True
|
|
|
if missing_required_variable:
|
|
|
continue
|
|
|
|
|
|
|
|
|
if class_type not in initialized_objects:
|
|
|
|
|
|
if class_type == "PreviewImage":
|
|
|
continue
|
|
|
|
|
|
class_type, import_statement, class_code = self.get_class_info(
|
|
|
class_type
|
|
|
)
|
|
|
initialized_objects[class_type] = self.clean_variable_name(class_type)
|
|
|
if class_type in self.base_node_class_mappings.keys():
|
|
|
import_statements.add(import_statement)
|
|
|
if class_type not in self.base_node_class_mappings.keys():
|
|
|
custom_nodes = True
|
|
|
special_functions_code.append(class_code)
|
|
|
|
|
|
|
|
|
class_def_params = self.get_function_parameters(
|
|
|
getattr(class_def, class_def.FUNCTION)
|
|
|
)
|
|
|
no_params = class_def_params is None
|
|
|
|
|
|
|
|
|
inputs = {
|
|
|
key: value
|
|
|
for key, value in inputs.items()
|
|
|
if no_params or key in class_def_params
|
|
|
}
|
|
|
|
|
|
if (
|
|
|
"hidden" in input_types.keys()
|
|
|
and "unique_id" in input_types["hidden"].keys()
|
|
|
):
|
|
|
inputs["unique_id"] = random.randint(1, 2**64)
|
|
|
elif class_def_params is not None:
|
|
|
if "unique_id" in class_def_params:
|
|
|
inputs["unique_id"] = random.randint(1, 2**64)
|
|
|
|
|
|
|
|
|
executed_variables[idx] = f"{self.clean_variable_name(class_type)}_{idx}"
|
|
|
inputs = self.update_inputs(inputs, executed_variables)
|
|
|
|
|
|
if is_special_function:
|
|
|
special_functions_code.append(
|
|
|
self.create_function_call_code(
|
|
|
initialized_objects[class_type],
|
|
|
class_def.FUNCTION,
|
|
|
executed_variables[idx],
|
|
|
is_special_function,
|
|
|
**inputs,
|
|
|
)
|
|
|
)
|
|
|
else:
|
|
|
code.append(
|
|
|
self.create_function_call_code(
|
|
|
initialized_objects[class_type],
|
|
|
class_def.FUNCTION,
|
|
|
executed_variables[idx],
|
|
|
is_special_function,
|
|
|
**inputs,
|
|
|
)
|
|
|
)
|
|
|
|
|
|
|
|
|
final_code = self.assemble_python_code(
|
|
|
import_statements, special_functions_code, code, queue_size, custom_nodes
|
|
|
)
|
|
|
|
|
|
return final_code
|
|
|
|
|
|
def create_function_call_code(
|
|
|
self,
|
|
|
obj_name: str,
|
|
|
func: str,
|
|
|
variable_name: str,
|
|
|
is_special_function: bool,
|
|
|
**kwargs,
|
|
|
) -> str:
|
|
|
"""Generate Python code for a function call.
|
|
|
|
|
|
Args:
|
|
|
obj_name (str): The name of the initialized object.
|
|
|
func (str): The function to be called.
|
|
|
variable_name (str): The name of the variable that the function result should be assigned to.
|
|
|
is_special_function (bool): Determines the code indentation.
|
|
|
**kwargs: The keyword arguments for the function.
|
|
|
|
|
|
Returns:
|
|
|
str: The generated Python code.
|
|
|
"""
|
|
|
args = ", ".join(self.format_arg(key, value) for key, value in kwargs.items())
|
|
|
|
|
|
|
|
|
code = f"{variable_name} = {obj_name}.{func}({args})\n"
|
|
|
|
|
|
|
|
|
|
|
|
if not is_special_function:
|
|
|
code = f"\t{code}"
|
|
|
|
|
|
return code
|
|
|
|
|
|
def format_arg(self, key: str, value: any) -> str:
|
|
|
"""Formats arguments based on key and value.
|
|
|
|
|
|
Args:
|
|
|
key (str): Argument key.
|
|
|
value (any): Argument value.
|
|
|
|
|
|
Returns:
|
|
|
str: Formatted argument as a string.
|
|
|
"""
|
|
|
if key == "noise_seed" or key == "seed":
|
|
|
return f"{key}=random.randint(1, 2**64)"
|
|
|
elif isinstance(value, str):
|
|
|
value = value.replace("\n", "\\n").replace('"', "'")
|
|
|
return f'{key}="{value}"'
|
|
|
elif isinstance(value, dict) and "variable_name" in value:
|
|
|
return f'{key}={value["variable_name"]}'
|
|
|
return f"{key}={value}"
|
|
|
|
|
|
def assemble_python_code(
|
|
|
self,
|
|
|
import_statements: set,
|
|
|
speical_functions_code: List[str],
|
|
|
code: List[str],
|
|
|
queue_size: int,
|
|
|
custom_nodes=False,
|
|
|
) -> str:
|
|
|
"""Generates the final code string.
|
|
|
|
|
|
Args:
|
|
|
import_statements (set): A set of unique import statements.
|
|
|
speical_functions_code (List[str]): A list of special functions code strings.
|
|
|
code (List[str]): A list of code strings.
|
|
|
queue_size (int): Number of photos that will be generated by the script.
|
|
|
custom_nodes (bool): Whether to include custom nodes in the code.
|
|
|
|
|
|
Returns:
|
|
|
str: Generated final code as a string.
|
|
|
"""
|
|
|
|
|
|
func_strings = []
|
|
|
for func in [
|
|
|
get_value_at_index,
|
|
|
find_path,
|
|
|
add_comfyui_directory_to_sys_path,
|
|
|
add_extra_model_paths,
|
|
|
]:
|
|
|
func_strings.append(f"\n{inspect.getsource(func)}")
|
|
|
|
|
|
static_imports = (
|
|
|
[
|
|
|
"import os",
|
|
|
"import random",
|
|
|
"import sys",
|
|
|
"from typing import Sequence, Mapping, Any, Union",
|
|
|
"import torch",
|
|
|
]
|
|
|
+ func_strings
|
|
|
+ ["\n\nadd_comfyui_directory_to_sys_path()\nadd_extra_model_paths()\n"]
|
|
|
)
|
|
|
|
|
|
if custom_nodes:
|
|
|
static_imports.append(f"\n{inspect.getsource(import_custom_nodes)}\n")
|
|
|
custom_nodes = "import_custom_nodes()\n\t"
|
|
|
else:
|
|
|
custom_nodes = ""
|
|
|
|
|
|
imports_code = [
|
|
|
f"from nodes import {', '.join([class_name for class_name in import_statements])}"
|
|
|
]
|
|
|
|
|
|
main_function_code = (
|
|
|
"def main():\n\t"
|
|
|
+ f"{custom_nodes}with torch.inference_mode():\n\t\t"
|
|
|
+ "\n\t\t".join(speical_functions_code)
|
|
|
+ f"\n\n\t\tfor q in range({queue_size}):\n\t\t"
|
|
|
+ "\n\t\t".join(code)
|
|
|
)
|
|
|
|
|
|
final_code = "\n".join(
|
|
|
static_imports
|
|
|
+ imports_code
|
|
|
+ ["", main_function_code, "", 'if __name__ == "__main__":', "\tmain()"]
|
|
|
)
|
|
|
|
|
|
final_code = black.format_str(final_code, mode=black.Mode())
|
|
|
|
|
|
return final_code
|
|
|
|
|
|
def get_class_info(self, class_type: str) -> Tuple[str, str, str]:
|
|
|
"""Generates and returns necessary information about class type.
|
|
|
|
|
|
Args:
|
|
|
class_type (str): Class type.
|
|
|
|
|
|
Returns:
|
|
|
Tuple[str, str, str]: Updated class type, import statement string, class initialization code.
|
|
|
"""
|
|
|
import_statement = class_type
|
|
|
variable_name = self.clean_variable_name(class_type)
|
|
|
if class_type in self.base_node_class_mappings.keys():
|
|
|
class_code = f"{variable_name} = {class_type.strip()}()"
|
|
|
else:
|
|
|
class_code = f'{variable_name} = NODE_CLASS_MAPPINGS["{class_type}"]()'
|
|
|
|
|
|
return class_type, import_statement, class_code
|
|
|
|
|
|
@staticmethod
|
|
|
def clean_variable_name(class_type: str) -> str:
|
|
|
"""
|
|
|
Remove any characters from variable name that could cause errors running the Python script.
|
|
|
|
|
|
Args:
|
|
|
class_type (str): Class type.
|
|
|
|
|
|
Returns:
|
|
|
str: Cleaned variable name with no special characters or spaces
|
|
|
"""
|
|
|
|
|
|
clean_name = class_type.lower().strip().replace("-", "_").replace(" ", "_")
|
|
|
|
|
|
|
|
|
clean_name = re.sub(r"[^a-z0-9_]", "", clean_name)
|
|
|
|
|
|
|
|
|
if clean_name[0].isdigit():
|
|
|
clean_name = "_" + clean_name
|
|
|
|
|
|
return clean_name
|
|
|
|
|
|
def get_function_parameters(self, func: Callable) -> List:
|
|
|
"""Get the names of a function's parameters.
|
|
|
|
|
|
Args:
|
|
|
func (Callable): The function whose parameters we want to inspect.
|
|
|
|
|
|
Returns:
|
|
|
List: A list containing the names of the function's parameters.
|
|
|
"""
|
|
|
signature = inspect.signature(func)
|
|
|
parameters = {
|
|
|
name: param.default if param.default != param.empty else None
|
|
|
for name, param in signature.parameters.items()
|
|
|
}
|
|
|
catch_all = any(
|
|
|
param.kind == inspect.Parameter.VAR_KEYWORD
|
|
|
for param in signature.parameters.values()
|
|
|
)
|
|
|
return list(parameters.keys()) if not catch_all else None
|
|
|
|
|
|
def update_inputs(self, inputs: Dict, executed_variables: Dict) -> Dict:
|
|
|
"""Update inputs based on the executed variables.
|
|
|
|
|
|
Args:
|
|
|
inputs (Dict): Inputs dictionary to update.
|
|
|
executed_variables (Dict): Dictionary storing executed variable names.
|
|
|
|
|
|
Returns:
|
|
|
Dict: Updated inputs dictionary.
|
|
|
"""
|
|
|
for key in inputs.keys():
|
|
|
if (
|
|
|
isinstance(inputs[key], list)
|
|
|
and inputs[key][0] in executed_variables.keys()
|
|
|
):
|
|
|
inputs[key] = {
|
|
|
"variable_name": f"get_value_at_index({executed_variables[inputs[key][0]]}, {inputs[key][1]})"
|
|
|
}
|
|
|
return inputs
|
|
|
|
|
|
|
|
|
class ComfyUItoPython:
|
|
|
"""Main workflow to generate Python code from a workflow_api.json file.
|
|
|
|
|
|
Attributes:
|
|
|
input_file (str): Path to the input JSON file.
|
|
|
output_file (str): Path to the output Python file.
|
|
|
queue_size (int): The number of photos that will be created by the script.
|
|
|
node_class_mappings (Dict): Mappings of node classes.
|
|
|
base_node_class_mappings (Dict): Base mappings of node classes.
|
|
|
"""
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
workflow: str = "",
|
|
|
input_file: str = "",
|
|
|
output_file: str | TextIO = "",
|
|
|
queue_size: int = 1,
|
|
|
node_class_mappings: Dict = NODE_CLASS_MAPPINGS,
|
|
|
needs_init_custom_nodes: bool = False,
|
|
|
):
|
|
|
"""Initialize the ComfyUItoPython class with the given parameters. Exactly one of workflow or input_file must be specified.
|
|
|
Args:
|
|
|
workflow (str): The workflow's JSON.
|
|
|
input_file (str): Path to the input JSON file.
|
|
|
output_file (str | TextIO): Path to the output file or a file-like object.
|
|
|
queue_size (int): The number of times a workflow will be executed by the script. Defaults to 1.
|
|
|
node_class_mappings (Dict): Mappings of node classes. Defaults to NODE_CLASS_MAPPINGS.
|
|
|
needs_init_custom_nodes (bool): Whether to initialize custom nodes. Defaults to False.
|
|
|
"""
|
|
|
if input_file and workflow:
|
|
|
raise ValueError("Can't provide both input_file and workflow")
|
|
|
elif not input_file and not workflow:
|
|
|
raise ValueError("Needs input_file or workflow")
|
|
|
|
|
|
if not output_file:
|
|
|
raise ValueError("Needs output_file")
|
|
|
|
|
|
self.workflow = workflow
|
|
|
self.input_file = input_file
|
|
|
self.output_file = output_file
|
|
|
self.queue_size = queue_size
|
|
|
self.node_class_mappings = node_class_mappings
|
|
|
self.needs_init_custom_nodes = needs_init_custom_nodes
|
|
|
|
|
|
self.base_node_class_mappings = copy.deepcopy(self.node_class_mappings)
|
|
|
self.execute()
|
|
|
|
|
|
def execute(self):
|
|
|
"""Execute the main workflow to generate Python code.
|
|
|
|
|
|
Returns:
|
|
|
None
|
|
|
"""
|
|
|
|
|
|
if self.needs_init_custom_nodes:
|
|
|
import_custom_nodes()
|
|
|
else:
|
|
|
|
|
|
self.base_node_class_mappings = {}
|
|
|
|
|
|
|
|
|
if self.input_file:
|
|
|
data = FileHandler.read_json_file(self.input_file)
|
|
|
else:
|
|
|
data = json.loads(self.workflow)
|
|
|
|
|
|
|
|
|
load_order_determiner = LoadOrderDeterminer(data, self.node_class_mappings)
|
|
|
load_order = load_order_determiner.determine_load_order()
|
|
|
|
|
|
|
|
|
code_generator = CodeGenerator(
|
|
|
self.node_class_mappings, self.base_node_class_mappings
|
|
|
)
|
|
|
generated_code = code_generator.generate_workflow(
|
|
|
load_order, queue_size=self.queue_size
|
|
|
)
|
|
|
|
|
|
|
|
|
FileHandler.write_code_to_file(self.output_file, generated_code)
|
|
|
|
|
|
print(f"Code successfully generated and written to {self.output_file}")
|
|
|
|
|
|
|
|
|
def run(
|
|
|
input_file: str = DEFAULT_INPUT_FILE,
|
|
|
output_file: str = DEFAULT_OUTPUT_FILE,
|
|
|
queue_size: int = DEFAULT_QUEUE_SIZE,
|
|
|
) -> None:
|
|
|
"""Generate Python code from a ComfyUI workflow_api.json file.
|
|
|
|
|
|
Args:
|
|
|
input_file (str): Path to the input JSON file. Defaults to "workflow_api.json".
|
|
|
output_file (str): Path to the output Python file.
|
|
|
Defaults to "workflow_api.py".
|
|
|
queue_size (int): The number of times a workflow will be executed by the script.
|
|
|
Defaults to 1.
|
|
|
|
|
|
Returns:
|
|
|
None
|
|
|
"""
|
|
|
ComfyUItoPython(
|
|
|
input_file=input_file,
|
|
|
output_file=output_file,
|
|
|
queue_size=queue_size,
|
|
|
needs_init_custom_nodes=True,
|
|
|
)
|
|
|
|
|
|
|
|
|
def main() -> None:
|
|
|
"""Main function to generate Python code from a ComfyUI workflow_api.json file."""
|
|
|
parser = ArgumentParser(
|
|
|
description="Generate Python code from a ComfyUI workflow_api.json file."
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"-f",
|
|
|
"--input_file",
|
|
|
type=str,
|
|
|
help="path to the input JSON file",
|
|
|
default=DEFAULT_INPUT_FILE,
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"-o",
|
|
|
"--output_file",
|
|
|
type=str,
|
|
|
help="path to the output Python file",
|
|
|
default=DEFAULT_OUTPUT_FILE,
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"-q",
|
|
|
"--queue_size",
|
|
|
type=int,
|
|
|
help="number of times the workflow will be executed by default",
|
|
|
default=DEFAULT_QUEUE_SIZE,
|
|
|
)
|
|
|
pargs = parser.parse_args()
|
|
|
run(**vars(pargs))
|
|
|
print("Done.")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
"""Run the main function."""
|
|
|
main()
|
|
|
|