Spaces:
Running
Running
import sys, os | |
from .utils import here, define_preprocessor_inputs, INPUT | |
from pathlib import Path | |
import traceback | |
import importlib | |
from .log import log, blue_text, cyan_text, get_summary, get_label | |
from .hint_image_enchance import NODE_CLASS_MAPPINGS as HIE_NODE_CLASS_MAPPINGS | |
from .hint_image_enchance import NODE_DISPLAY_NAME_MAPPINGS as HIE_NODE_DISPLAY_NAME_MAPPINGS | |
#Ref: https://github.com/comfyanonymous/ComfyUI/blob/76d53c4622fc06372975ed2a43ad345935b8a551/nodes.py#L17 | |
sys.path.insert(0, str(Path(here, "src").resolve())) | |
for pkg_name in ["custom_controlnet_aux", "custom_mmpkg"]: | |
sys.path.append(str(Path(here, "src", pkg_name).resolve())) | |
#Enable CPU fallback for ops not being supported by MPS like upsample_bicubic2d.out | |
#https://github.com/pytorch/pytorch/issues/77764 | |
#https://github.com/Fannovel16/comfyui_controlnet_aux/issues/2#issuecomment-1763579485 | |
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = os.getenv("PYTORCH_ENABLE_MPS_FALLBACK", '1') | |
def load_nodes(): | |
shorted_errors = [] | |
full_error_messages = [] | |
node_class_mappings = {} | |
node_display_name_mappings = {} | |
for filename in (here / "node_wrappers").iterdir(): | |
module_name = filename.stem | |
if module_name.startswith('.'): continue #Skip hidden files created by the OS (e.g. [.DS_Store](https://en.wikipedia.org/wiki/.DS_Store)) | |
try: | |
module = importlib.import_module( | |
f".node_wrappers.{module_name}", package=__package__ | |
) | |
node_class_mappings.update(getattr(module, "NODE_CLASS_MAPPINGS")) | |
if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS"): | |
node_display_name_mappings.update(getattr(module, "NODE_DISPLAY_NAME_MAPPINGS")) | |
log.debug(f"Imported {module_name} nodes") | |
except AttributeError: | |
pass # wip nodes | |
except Exception: | |
error_message = traceback.format_exc() | |
full_error_messages.append(error_message) | |
error_message = error_message.splitlines()[-1] | |
shorted_errors.append( | |
f"Failed to import module {module_name} because {error_message}" | |
) | |
if len(shorted_errors) > 0: | |
full_err_log = '\n\n'.join(full_error_messages) | |
print(f"\n\nFull error log from comfyui_controlnet_aux: \n{full_err_log}\n\n") | |
log.info( | |
f"Some nodes failed to load:\n\t" | |
+ "\n\t".join(shorted_errors) | |
+ "\n\n" | |
+ "Check that you properly installed the dependencies.\n" | |
+ "If you think this is a bug, please report it on the github page (https://github.com/Fannovel16/comfyui_controlnet_aux/issues)" | |
) | |
return node_class_mappings, node_display_name_mappings | |
AUX_NODE_MAPPINGS, AUX_DISPLAY_NAME_MAPPINGS = load_nodes() | |
#For nodes not mapping image to image or has special requirements | |
AIO_NOT_SUPPORTED = ["InpaintPreprocessor", "MeshGraphormer+ImpactDetector-DepthMapPreprocessor", "DiffusionEdge_Preprocessor"] | |
AIO_NOT_SUPPORTED += ["SavePoseKpsAsJsonFile", "FacialPartColoringFromPoseKps", "UpperBodyTrackingFromPoseKps", "RenderPeopleKps", "RenderAnimalKps"] | |
AIO_NOT_SUPPORTED += ["Unimatch_OptFlowPreprocessor", "MaskOptFlow"] | |
def preprocessor_options(): | |
auxs = list(AUX_NODE_MAPPINGS.keys()) | |
auxs.insert(0, "none") | |
for name in AIO_NOT_SUPPORTED: | |
if name in auxs: | |
auxs.remove(name) | |
return auxs | |
PREPROCESSOR_OPTIONS = preprocessor_options() | |
class AIO_Preprocessor: | |
def INPUT_TYPES(s): | |
return define_preprocessor_inputs( | |
preprocessor=INPUT.COMBO(PREPROCESSOR_OPTIONS, default="none"), | |
resolution=INPUT.RESOLUTION() | |
) | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "execute" | |
CATEGORY = "ControlNet Preprocessors" | |
def execute(self, preprocessor, image, resolution=512): | |
if preprocessor == "none": | |
return (image, ) | |
else: | |
aux_class = AUX_NODE_MAPPINGS[preprocessor] | |
input_types = aux_class.INPUT_TYPES() | |
input_types = { | |
**input_types["required"], | |
**(input_types["optional"] if "optional" in input_types else {}) | |
} | |
params = {} | |
for name, input_type in input_types.items(): | |
if name == "image": | |
params[name] = image | |
continue | |
if name == "resolution": | |
params[name] = resolution | |
continue | |
if len(input_type) == 2 and ("default" in input_type[1]): | |
params[name] = input_type[1]["default"] | |
continue | |
default_values = { "INT": 0, "FLOAT": 0.0 } | |
if input_type[0] in default_values: | |
params[name] = default_values[input_type[0]] | |
return getattr(aux_class(), aux_class.FUNCTION)(**params) | |
class ControlNetAuxSimpleAddText: | |
def INPUT_TYPES(s): | |
return dict( | |
required=dict(image=INPUT.IMAGE(), text=INPUT.STRING()) | |
) | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "execute" | |
CATEGORY = "ControlNet Preprocessors" | |
def execute(self, image, text): | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
import torch | |
font = ImageFont.truetype(str((here / "NotoSans-Regular.ttf").resolve()), 40) | |
img = Image.fromarray(image[0].cpu().numpy().__mul__(255.).astype(np.uint8)) | |
ImageDraw.Draw(img).text((0,0), text, fill=(0,255,0), font=font) | |
return (torch.from_numpy(np.array(img)).unsqueeze(0) / 255.,) | |
class ExecuteAllControlNetPreprocessors: | |
def INPUT_TYPES(s): | |
return define_preprocessor_inputs(resolution=INPUT.RESOLUTION()) | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "execute" | |
CATEGORY = "ControlNet Preprocessors" | |
def execute(self, image, resolution=512): | |
try: | |
from comfy_execution.graph_utils import GraphBuilder | |
except: | |
raise RuntimeError("ExecuteAllControlNetPreprocessor requries [Execution Model Inversion](https://github.com/comfyanonymous/ComfyUI/commit/5cfe38). Update ComfyUI/SwarmUI to get this feature") | |
graph = GraphBuilder() | |
curr_outputs = [] | |
for preprocc in PREPROCESSOR_OPTIONS: | |
preprocc_node = graph.node("AIO_Preprocessor", preprocessor=preprocc, image=image, resolution=resolution) | |
hint_img = preprocc_node.out(0) | |
add_text_node = graph.node("ControlNetAuxSimpleAddText", image=hint_img, text=preprocc) | |
curr_outputs.append(add_text_node.out(0)) | |
while len(curr_outputs) > 1: | |
_outputs = [] | |
for i in range(0, len(curr_outputs), 2): | |
if i+1 < len(curr_outputs): | |
image_batch = graph.node("ImageBatch", image1=curr_outputs[i], image2=curr_outputs[i+1]) | |
_outputs.append(image_batch.out(0)) | |
else: | |
_outputs.append(curr_outputs[i]) | |
curr_outputs = _outputs | |
return { | |
"result": (curr_outputs[0],), | |
"expand": graph.finalize(), | |
} | |
class ControlNetPreprocessorSelector: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"preprocessor": (PREPROCESSOR_OPTIONS,), | |
} | |
} | |
RETURN_TYPES = (PREPROCESSOR_OPTIONS,) | |
RETURN_NAMES = ("preprocessor",) | |
FUNCTION = "get_preprocessor" | |
CATEGORY = "ControlNet Preprocessors" | |
def get_preprocessor(self, preprocessor: str): | |
return (preprocessor,) | |
NODE_CLASS_MAPPINGS = { | |
**AUX_NODE_MAPPINGS, | |
"AIO_Preprocessor": AIO_Preprocessor, | |
"ControlNetPreprocessorSelector": ControlNetPreprocessorSelector, | |
**HIE_NODE_CLASS_MAPPINGS, | |
"ExecuteAllControlNetPreprocessors": ExecuteAllControlNetPreprocessors, | |
"ControlNetAuxSimpleAddText": ControlNetAuxSimpleAddText | |
} | |
NODE_DISPLAY_NAME_MAPPINGS = { | |
**AUX_DISPLAY_NAME_MAPPINGS, | |
"AIO_Preprocessor": "AIO Aux Preprocessor", | |
"ControlNetPreprocessorSelector": "Preprocessor Selector", | |
**HIE_NODE_DISPLAY_NAME_MAPPINGS, | |
"ExecuteAllControlNetPreprocessors": "Execute All ControlNet Preprocessors" | |
} | |