| from comfy_execution.graph_utils import GraphBuilder |
| from .tools import VariantSupport |
|
|
| @VariantSupport() |
| class TestAccumulateNode: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "to_add": ("*",), |
| }, |
| "optional": { |
| "accumulation": ("ACCUMULATION",), |
| }, |
| } |
|
|
| RETURN_TYPES = ("ACCUMULATION",) |
| FUNCTION = "accumulate" |
|
|
| CATEGORY = "Testing/Lists" |
|
|
| def accumulate(self, to_add, accumulation = None): |
| if accumulation is None: |
| value = [to_add] |
| else: |
| value = accumulation["accum"] + [to_add] |
| return ({"accum": value},) |
|
|
| @VariantSupport() |
| class TestAccumulationHeadNode: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "accumulation": ("ACCUMULATION",), |
| }, |
| } |
|
|
| RETURN_TYPES = ("ACCUMULATION", "*",) |
| FUNCTION = "accumulation_head" |
|
|
| CATEGORY = "Testing/Lists" |
|
|
| def accumulation_head(self, accumulation): |
| accum = accumulation["accum"] |
| if len(accum) == 0: |
| return (accumulation, None) |
| else: |
| return ({"accum": accum[1:]}, accum[0]) |
|
|
| class TestAccumulationTailNode: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "accumulation": ("ACCUMULATION",), |
| }, |
| } |
|
|
| RETURN_TYPES = ("ACCUMULATION", "*",) |
| FUNCTION = "accumulation_tail" |
|
|
| CATEGORY = "Testing/Lists" |
|
|
| def accumulation_tail(self, accumulation): |
| accum = accumulation["accum"] |
| if len(accum) == 0: |
| return (None, accumulation) |
| else: |
| return ({"accum": accum[:-1]}, accum[-1]) |
|
|
| @VariantSupport() |
| class TestAccumulationToListNode: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "accumulation": ("ACCUMULATION",), |
| }, |
| } |
|
|
| RETURN_TYPES = ("*",) |
| OUTPUT_IS_LIST = (True,) |
|
|
| FUNCTION = "accumulation_to_list" |
|
|
| CATEGORY = "Testing/Lists" |
|
|
| def accumulation_to_list(self, accumulation): |
| return (accumulation["accum"],) |
|
|
| @VariantSupport() |
| class TestListToAccumulationNode: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "list": ("*",), |
| }, |
| } |
|
|
| RETURN_TYPES = ("ACCUMULATION",) |
| INPUT_IS_LIST = (True,) |
|
|
| FUNCTION = "list_to_accumulation" |
|
|
| CATEGORY = "Testing/Lists" |
|
|
| def list_to_accumulation(self, list): |
| return ({"accum": list},) |
|
|
| @VariantSupport() |
| class TestAccumulationGetLengthNode: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "accumulation": ("ACCUMULATION",), |
| }, |
| } |
|
|
| RETURN_TYPES = ("INT",) |
|
|
| FUNCTION = "accumlength" |
|
|
| CATEGORY = "Testing/Lists" |
|
|
| def accumlength(self, accumulation): |
| return (len(accumulation['accum']),) |
| |
| @VariantSupport() |
| class TestAccumulationGetItemNode: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "accumulation": ("ACCUMULATION",), |
| "index": ("INT", {"default":0, "step":1}) |
| }, |
| } |
|
|
| RETURN_TYPES = ("*",) |
|
|
| FUNCTION = "get_item" |
|
|
| CATEGORY = "Testing/Lists" |
|
|
| def get_item(self, accumulation, index): |
| return (accumulation['accum'][index],) |
| |
| @VariantSupport() |
| class TestAccumulationSetItemNode: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "accumulation": ("ACCUMULATION",), |
| "index": ("INT", {"default":0, "step":1}), |
| "value": ("*",), |
| }, |
| } |
|
|
| RETURN_TYPES = ("ACCUMULATION",) |
|
|
| FUNCTION = "set_item" |
|
|
| CATEGORY = "Testing/Lists" |
|
|
| def set_item(self, accumulation, index, value): |
| new_accum = accumulation['accum'][:] |
| new_accum[index] = value |
| return ({"accum": new_accum},) |
|
|
| class TestIntMathOperation: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "a": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}), |
| "b": ("INT", {"default": 0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 1}), |
| "operation": (["add", "subtract", "multiply", "divide", "modulo", "power"],), |
| }, |
| } |
|
|
| RETURN_TYPES = ("INT",) |
| FUNCTION = "int_math_operation" |
|
|
| CATEGORY = "Testing/Logic" |
|
|
| def int_math_operation(self, a, b, operation): |
| if operation == "add": |
| return (a + b,) |
| elif operation == "subtract": |
| return (a - b,) |
| elif operation == "multiply": |
| return (a * b,) |
| elif operation == "divide": |
| return (a // b,) |
| elif operation == "modulo": |
| return (a % b,) |
| elif operation == "power": |
| return (a ** b,) |
|
|
|
|
| from .flow_control import NUM_FLOW_SOCKETS |
| @VariantSupport() |
| class TestForLoopOpen: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "remaining": ("INT", {"default": 1, "min": 0, "max": 100000, "step": 1}), |
| }, |
| "optional": { |
| f"initial_value{i}": ("*",) for i in range(1, NUM_FLOW_SOCKETS) |
| }, |
| "hidden": { |
| "initial_value0": ("*",) |
| } |
| } |
|
|
| RETURN_TYPES = tuple(["FLOW_CONTROL", "INT",] + ["*"] * (NUM_FLOW_SOCKETS-1)) |
| RETURN_NAMES = tuple(["flow_control", "remaining"] + [f"value{i}" for i in range(1, NUM_FLOW_SOCKETS)]) |
| FUNCTION = "for_loop_open" |
|
|
| CATEGORY = "Testing/Flow" |
|
|
| def for_loop_open(self, remaining, **kwargs): |
| graph = GraphBuilder() |
| if "initial_value0" in kwargs: |
| remaining = kwargs["initial_value0"] |
| while_open = graph.node("TestWhileLoopOpen", condition=remaining, initial_value0=remaining, **{(f"initial_value{i}"): kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)}) |
| outputs = [kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)] |
| return { |
| "result": tuple(["stub", remaining] + outputs), |
| "expand": graph.finalize(), |
| } |
|
|
| @VariantSupport() |
| class TestForLoopClose: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "flow_control": ("FLOW_CONTROL", {"rawLink": True}), |
| }, |
| "optional": { |
| f"initial_value{i}": ("*",{"rawLink": True}) for i in range(1, NUM_FLOW_SOCKETS) |
| }, |
| } |
|
|
| RETURN_TYPES = tuple(["*"] * (NUM_FLOW_SOCKETS-1)) |
| RETURN_NAMES = tuple([f"value{i}" for i in range(1, NUM_FLOW_SOCKETS)]) |
| FUNCTION = "for_loop_close" |
|
|
| CATEGORY = "Testing/Flow" |
|
|
| def for_loop_close(self, flow_control, **kwargs): |
| graph = GraphBuilder() |
| while_open = flow_control[0] |
| sub = graph.node("TestIntMathOperation", operation="subtract", a=[while_open,1], b=1) |
| cond = graph.node("TestToBoolNode", value=sub.out(0)) |
| input_values = {f"initial_value{i}": kwargs.get(f"initial_value{i}", None) for i in range(1, NUM_FLOW_SOCKETS)} |
| while_close = graph.node("TestWhileLoopClose", |
| flow_control=flow_control, |
| condition=cond.out(0), |
| initial_value0=sub.out(0), |
| **input_values) |
| return { |
| "result": tuple([while_close.out(i) for i in range(1, NUM_FLOW_SOCKETS)]), |
| "expand": graph.finalize(), |
| } |
|
|
| NUM_LIST_SOCKETS = 10 |
| @VariantSupport() |
| class TestMakeListNode: |
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "value1": ("*",), |
| }, |
| "optional": { |
| f"value{i}": ("*",) for i in range(1, NUM_LIST_SOCKETS) |
| }, |
| } |
|
|
| RETURN_TYPES = ("*",) |
| FUNCTION = "make_list" |
| OUTPUT_IS_LIST = (True,) |
|
|
| CATEGORY = "Testing/Lists" |
|
|
| def make_list(self, **kwargs): |
| result = [] |
| for i in range(NUM_LIST_SOCKETS): |
| if f"value{i}" in kwargs: |
| result.append(kwargs[f"value{i}"]) |
| return (result,) |
|
|
| UTILITY_NODE_CLASS_MAPPINGS = { |
| "TestAccumulateNode": TestAccumulateNode, |
| "TestAccumulationHeadNode": TestAccumulationHeadNode, |
| "TestAccumulationTailNode": TestAccumulationTailNode, |
| "TestAccumulationToListNode": TestAccumulationToListNode, |
| "TestListToAccumulationNode": TestListToAccumulationNode, |
| "TestAccumulationGetLengthNode": TestAccumulationGetLengthNode, |
| "TestAccumulationGetItemNode": TestAccumulationGetItemNode, |
| "TestAccumulationSetItemNode": TestAccumulationSetItemNode, |
| "TestForLoopOpen": TestForLoopOpen, |
| "TestForLoopClose": TestForLoopClose, |
| "TestIntMathOperation": TestIntMathOperation, |
| "TestMakeListNode": TestMakeListNode, |
| } |
| UTILITY_NODE_DISPLAY_NAME_MAPPINGS = { |
| "TestAccumulateNode": "Accumulate", |
| "TestAccumulationHeadNode": "Accumulation Head", |
| "TestAccumulationTailNode": "Accumulation Tail", |
| "TestAccumulationToListNode": "Accumulation to List", |
| "TestListToAccumulationNode": "List to Accumulation", |
| "TestAccumulationGetLengthNode": "Accumulation Get Length", |
| "TestAccumulationGetItemNode": "Accumulation Get Item", |
| "TestAccumulationSetItemNode": "Accumulation Set Item", |
| "TestForLoopOpen": "For Loop Open", |
| "TestForLoopClose": "For Loop Close", |
| "TestIntMathOperation": "Int Math Operation", |
| "TestMakeListNode": "Make List", |
| } |
|
|