Spaces:
Runtime error
Runtime error
| import os | |
| from typing import Sequence, Mapping, Any, Union | |
| import sys | |
| sys.path.append('../') | |
| args = None | |
| def import_custom_nodes() -> None: | |
| """Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS | |
| This function sets up a new asyncio event loop, initializes the PromptServer, | |
| creates a PromptQueue, and initializes the custom nodes. | |
| """ | |
| import asyncio | |
| import execution | |
| from nodes import init_extra_nodes | |
| import server | |
| # Creating a new event loop and setting it as the default loop | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # Creating an instance of PromptServer with the loop | |
| server_instance = server.PromptServer(loop) | |
| execution.PromptQueue(server_instance) | |
| # Initializing custom nodes | |
| init_extra_nodes(init_custom_nodes=True) | |
| def find_path(name: str, path: str = None) -> str: | |
| """ | |
| Recursively looks at parent folders starting from the given path until it finds the given name. | |
| Returns the path as a Path object if found, or None otherwise. | |
| """ | |
| # If no path is given, use the current working directory | |
| if path is None: | |
| if args is None or args.comfyui_directory is None: | |
| path = os.getcwd() | |
| else: | |
| path = args.comfyui_directory | |
| # Check if the current directory contains the name | |
| if name in os.listdir(path): | |
| path_name = os.path.join(path, name) | |
| print(f"{name} found: {path_name}") | |
| return path_name | |
| # Get the parent directory | |
| parent_directory = os.path.dirname(path) | |
| # If the parent directory is the same as the current directory, we've reached the root and stop the search | |
| if parent_directory == path: | |
| return None | |
| # Recursively call the function with the parent directory | |
| return find_path(name, parent_directory) | |
| def add_comfyui_directory_to_sys_path() -> None: | |
| """ | |
| Add 'ComfyUI' to the sys.path | |
| """ | |
| comfyui_path = find_path('ComfyUI') | |
| if comfyui_path is not None and os.path.isdir(comfyui_path): | |
| sys.path.append(comfyui_path) | |
| import __main__ | |
| if getattr(__main__, "__file__", None) is None: | |
| __main__.__file__ = os.path.join(comfyui_path, "main.py") | |
| print(f"'{comfyui_path}' added to sys.path") | |
| def add_extra_model_paths() -> None: | |
| """ | |
| Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path. | |
| """ | |
| from utils.extra_config import load_extra_path_config | |
| extra_model_paths = find_path("extra_model_paths.yaml") | |
| if extra_model_paths is not None: | |
| load_extra_path_config(extra_model_paths) | |
| else: | |
| print("Could not find the extra_model_paths config file.") | |
| def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: | |
| """Returns the value at the given index of a sequence or mapping. | |
| If the object is a sequence (like list or string), returns the value at the given index. | |
| If the object is a mapping (like a dictionary), returns the value at the index-th key. | |
| Some return a dictionary, in these cases, we look for the "results" key | |
| Args: | |
| obj (Union[Sequence, Mapping]): The object to retrieve the value from. | |
| index (int): The index of the value to retrieve. | |
| Returns: | |
| Any: The value at the given index. | |
| Raises: | |
| IndexError: If the index is out of bounds for the object and the object is not a mapping. | |
| """ | |
| try: | |
| return obj[index] | |
| except KeyError: | |
| return obj['result'][index] | |
| def parse_arg(s: Any): | |
| """ Parses a JSON string, returning it unchanged if the parsing fails. """ | |
| if __name__ == "__main__" or not isinstance(s, str): | |
| return s | |
| try: | |
| return json.loads(s) | |
| except json.JSONDecodeError: | |
| return s | |
| def save_image_wrapper(context, cls): | |
| if args.output is None: return cls | |
| from PIL import Image, ImageOps, ImageSequence | |
| from PIL.PngImagePlugin import PngInfo | |
| import numpy as np | |
| class WrappedSaveImage(cls): | |
| counter = 0 | |
| def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): | |
| if args.output is None: | |
| return super().save_images(images, filename_prefix, prompt, extra_pnginfo) | |
| else: | |
| if len(images) > 1 and args.output == "-": | |
| raise ValueError("Cannot save multiple images to stdout") | |
| filename_prefix += self.prefix_append | |
| results = list() | |
| for (batch_number, image) in enumerate(images): | |
| i = 255. * image.cpu().numpy() | |
| img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) | |
| metadata = None | |
| if not args.disable_metadata: | |
| metadata = PngInfo() | |
| if prompt is not None: | |
| metadata.add_text("prompt", json.dumps(prompt)) | |
| if extra_pnginfo is not None: | |
| for x in extra_pnginfo: | |
| metadata.add_text(x, json.dumps(extra_pnginfo[x])) | |
| if args.output == "-": | |
| # Hack to briefly restore stdout | |
| if context is not None: | |
| context.__exit__(None, None, None) | |
| try: | |
| img.save(sys.stdout.buffer, format="png", pnginfo=metadata, compress_level=self.compress_level) | |
| finally: | |
| if context is not None: | |
| context.__enter__() | |
| else: | |
| subfolder = "" | |
| if len(images) == 1: | |
| if os.path.isdir(args.output): | |
| subfolder = args.output | |
| file = "output.png" | |
| else: | |
| subfolder, file = os.path.split(args.output) | |
| if subfolder == "": | |
| subfolder = os.getcwd() | |
| else: | |
| if os.path.isdir(args.output): | |
| subfolder = args.output | |
| file = filename_prefix | |
| else: | |
| subfolder, file = os.path.split(args.output) | |
| if subfolder == "": | |
| subfolder = os.getcwd() | |
| files = os.listdir(subfolder) | |
| file_pattern = file | |
| while True: | |
| filename_with_batch_num = file_pattern.replace("%batch_num%", str(batch_number)) | |
| file = f"{filename_with_batch_num}_{self.counter:05}.png" | |
| self.counter += 1 | |
| if file not in files: | |
| break | |
| img.save(os.path.join(subfolder, file), pnginfo=metadata, compress_level=self.compress_level) | |
| print("Saved image to", os.path.join(subfolder, file)) | |
| results.append({ | |
| "filename": file, | |
| "subfolder": subfolder, | |
| "type": self.type | |
| }) | |
| return {"ui": {"images": results}} | |
| return WrappedSaveImage | |