gartajackhats1985's picture
Upload 1830 files
07f408f verified
import sys, os
from .utils import here, define_preprocessor_inputs, INPUT
from pathlib import Path
import traceback
import importlib
from .log import log, blue_text, cyan_text, get_summary, get_label
from .hint_image_enchance import NODE_CLASS_MAPPINGS as HIE_NODE_CLASS_MAPPINGS
from .hint_image_enchance import NODE_DISPLAY_NAME_MAPPINGS as HIE_NODE_DISPLAY_NAME_MAPPINGS
#Ref: https://github.com/comfyanonymous/ComfyUI/blob/76d53c4622fc06372975ed2a43ad345935b8a551/nodes.py#L17
sys.path.insert(0, str(Path(here, "src").resolve()))
for pkg_name in ["custom_controlnet_aux", "custom_mmpkg"]:
sys.path.append(str(Path(here, "src", pkg_name).resolve()))
#Enable CPU fallback for ops not being supported by MPS like upsample_bicubic2d.out
#https://github.com/pytorch/pytorch/issues/77764
#https://github.com/Fannovel16/comfyui_controlnet_aux/issues/2#issuecomment-1763579485
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = os.getenv("PYTORCH_ENABLE_MPS_FALLBACK", '1')
def load_nodes():
shorted_errors = []
full_error_messages = []
node_class_mappings = {}
node_display_name_mappings = {}
for filename in (here / "node_wrappers").iterdir():
module_name = filename.stem
if module_name.startswith('.'): continue #Skip hidden files created by the OS (e.g. [.DS_Store](https://en.wikipedia.org/wiki/.DS_Store))
try:
module = importlib.import_module(
f".node_wrappers.{module_name}", package=__package__
)
node_class_mappings.update(getattr(module, "NODE_CLASS_MAPPINGS"))
if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS"):
node_display_name_mappings.update(getattr(module, "NODE_DISPLAY_NAME_MAPPINGS"))
log.debug(f"Imported {module_name} nodes")
except AttributeError:
pass # wip nodes
except Exception:
error_message = traceback.format_exc()
full_error_messages.append(error_message)
error_message = error_message.splitlines()[-1]
shorted_errors.append(
f"Failed to import module {module_name} because {error_message}"
)
if len(shorted_errors) > 0:
full_err_log = '\n\n'.join(full_error_messages)
print(f"\n\nFull error log from comfyui_controlnet_aux: \n{full_err_log}\n\n")
log.info(
f"Some nodes failed to load:\n\t"
+ "\n\t".join(shorted_errors)
+ "\n\n"
+ "Check that you properly installed the dependencies.\n"
+ "If you think this is a bug, please report it on the github page (https://github.com/Fannovel16/comfyui_controlnet_aux/issues)"
)
return node_class_mappings, node_display_name_mappings
AUX_NODE_MAPPINGS, AUX_DISPLAY_NAME_MAPPINGS = load_nodes()
#For nodes not mapping image to image or has special requirements
AIO_NOT_SUPPORTED = ["InpaintPreprocessor", "MeshGraphormer+ImpactDetector-DepthMapPreprocessor", "DiffusionEdge_Preprocessor"]
AIO_NOT_SUPPORTED += ["SavePoseKpsAsJsonFile", "FacialPartColoringFromPoseKps", "UpperBodyTrackingFromPoseKps", "RenderPeopleKps", "RenderAnimalKps"]
AIO_NOT_SUPPORTED += ["Unimatch_OptFlowPreprocessor", "MaskOptFlow"]
def preprocessor_options():
auxs = list(AUX_NODE_MAPPINGS.keys())
auxs.insert(0, "none")
for name in AIO_NOT_SUPPORTED:
if name in auxs:
auxs.remove(name)
return auxs
PREPROCESSOR_OPTIONS = preprocessor_options()
class AIO_Preprocessor:
@classmethod
def INPUT_TYPES(s):
return define_preprocessor_inputs(
preprocessor=INPUT.COMBO(PREPROCESSOR_OPTIONS, default="none"),
resolution=INPUT.RESOLUTION()
)
RETURN_TYPES = ("IMAGE",)
FUNCTION = "execute"
CATEGORY = "ControlNet Preprocessors"
def execute(self, preprocessor, image, resolution=512):
if preprocessor == "none":
return (image, )
else:
aux_class = AUX_NODE_MAPPINGS[preprocessor]
input_types = aux_class.INPUT_TYPES()
input_types = {
**input_types["required"],
**(input_types["optional"] if "optional" in input_types else {})
}
params = {}
for name, input_type in input_types.items():
if name == "image":
params[name] = image
continue
if name == "resolution":
params[name] = resolution
continue
if len(input_type) == 2 and ("default" in input_type[1]):
params[name] = input_type[1]["default"]
continue
default_values = { "INT": 0, "FLOAT": 0.0 }
if input_type[0] in default_values:
params[name] = default_values[input_type[0]]
return getattr(aux_class(), aux_class.FUNCTION)(**params)
class ControlNetAuxSimpleAddText:
@classmethod
def INPUT_TYPES(s):
return dict(
required=dict(image=INPUT.IMAGE(), text=INPUT.STRING())
)
RETURN_TYPES = ("IMAGE",)
FUNCTION = "execute"
CATEGORY = "ControlNet Preprocessors"
def execute(self, image, text):
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import torch
font = ImageFont.truetype(str((here / "NotoSans-Regular.ttf").resolve()), 40)
img = Image.fromarray(image[0].cpu().numpy().__mul__(255.).astype(np.uint8))
ImageDraw.Draw(img).text((0,0), text, fill=(0,255,0), font=font)
return (torch.from_numpy(np.array(img)).unsqueeze(0) / 255.,)
class ExecuteAllControlNetPreprocessors:
@classmethod
def INPUT_TYPES(s):
return define_preprocessor_inputs(resolution=INPUT.RESOLUTION())
RETURN_TYPES = ("IMAGE",)
FUNCTION = "execute"
CATEGORY = "ControlNet Preprocessors"
def execute(self, image, resolution=512):
try:
from comfy_execution.graph_utils import GraphBuilder
except:
raise RuntimeError("ExecuteAllControlNetPreprocessor requries [Execution Model Inversion](https://github.com/comfyanonymous/ComfyUI/commit/5cfe38). Update ComfyUI/SwarmUI to get this feature")
graph = GraphBuilder()
curr_outputs = []
for preprocc in PREPROCESSOR_OPTIONS:
preprocc_node = graph.node("AIO_Preprocessor", preprocessor=preprocc, image=image, resolution=resolution)
hint_img = preprocc_node.out(0)
add_text_node = graph.node("ControlNetAuxSimpleAddText", image=hint_img, text=preprocc)
curr_outputs.append(add_text_node.out(0))
while len(curr_outputs) > 1:
_outputs = []
for i in range(0, len(curr_outputs), 2):
if i+1 < len(curr_outputs):
image_batch = graph.node("ImageBatch", image1=curr_outputs[i], image2=curr_outputs[i+1])
_outputs.append(image_batch.out(0))
else:
_outputs.append(curr_outputs[i])
curr_outputs = _outputs
return {
"result": (curr_outputs[0],),
"expand": graph.finalize(),
}
class ControlNetPreprocessorSelector:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"preprocessor": (PREPROCESSOR_OPTIONS,),
}
}
RETURN_TYPES = (PREPROCESSOR_OPTIONS,)
RETURN_NAMES = ("preprocessor",)
FUNCTION = "get_preprocessor"
CATEGORY = "ControlNet Preprocessors"
def get_preprocessor(self, preprocessor: str):
return (preprocessor,)
NODE_CLASS_MAPPINGS = {
**AUX_NODE_MAPPINGS,
"AIO_Preprocessor": AIO_Preprocessor,
"ControlNetPreprocessorSelector": ControlNetPreprocessorSelector,
**HIE_NODE_CLASS_MAPPINGS,
"ExecuteAllControlNetPreprocessors": ExecuteAllControlNetPreprocessors,
"ControlNetAuxSimpleAddText": ControlNetAuxSimpleAddText
}
NODE_DISPLAY_NAME_MAPPINGS = {
**AUX_DISPLAY_NAME_MAPPINGS,
"AIO_Preprocessor": "AIO Aux Preprocessor",
"ControlNetPreprocessorSelector": "Preprocessor Selector",
**HIE_NODE_DISPLAY_NAME_MAPPINGS,
"ExecuteAllControlNetPreprocessors": "Execute All ControlNet Preprocessors"
}