|
import os
|
|
import random
|
|
import sys
|
|
from typing import Sequence, Mapping, Any, Union
|
|
import torch
|
|
from PIL import Image
|
|
|
|
|
|
def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any:
|
|
"""Returns the value at the given index of a sequence or mapping.
|
|
|
|
If the object is a sequence (like list or string), returns the value at the given index.
|
|
If the object is a mapping (like a dictionary), returns the value at the index-th key.
|
|
|
|
Some return a dictionary, in these cases, we look for the "results" key
|
|
|
|
Args:
|
|
obj (Union[Sequence, Mapping]): The object to retrieve the value from.
|
|
index (int): The index of the value to retrieve.
|
|
|
|
Returns:
|
|
Any: The value at the given index.
|
|
|
|
Raises:
|
|
IndexError: If the index is out of bounds for the object and the object is not a mapping.
|
|
"""
|
|
try:
|
|
return obj[index]
|
|
except KeyError:
|
|
return obj["result"][index]
|
|
|
|
|
|
def find_path(name: str, path: str = None) -> str:
|
|
"""
|
|
Recursively looks at parent folders starting from the given path until it finds the given name.
|
|
Returns the path as a Path object if found, or None otherwise.
|
|
"""
|
|
|
|
if path is None:
|
|
path = os.getcwd()
|
|
|
|
|
|
if name in os.listdir(path):
|
|
path_name = os.path.join(path, name)
|
|
print(f"{name} found: {path_name}")
|
|
return path_name
|
|
|
|
|
|
parent_directory = os.path.dirname(path)
|
|
|
|
|
|
if parent_directory == path:
|
|
return None
|
|
|
|
|
|
return find_path(name, parent_directory)
|
|
|
|
|
|
def add_comfyui_directory_to_sys_path() -> None:
|
|
"""
|
|
Add 'ComfyUI' to the sys.path
|
|
"""
|
|
comfyui_path = find_path("ComfyUI")
|
|
if comfyui_path is not None and os.path.isdir(comfyui_path):
|
|
sys.path.append(comfyui_path)
|
|
print(f"'{comfyui_path}' added to sys.path")
|
|
|
|
|
|
def add_extra_model_paths() -> None:
|
|
"""
|
|
Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path.
|
|
"""
|
|
from main import load_extra_path_config
|
|
|
|
extra_model_paths = find_path("extra_model_paths.yaml")
|
|
|
|
if extra_model_paths is not None:
|
|
load_extra_path_config(extra_model_paths)
|
|
else:
|
|
print("Could not find the extra_model_paths config file.")
|
|
|
|
|
|
add_comfyui_directory_to_sys_path()
|
|
add_extra_model_paths()
|
|
|
|
|
|
def import_custom_nodes() -> None:
|
|
"""Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS
|
|
|
|
This function sets up a new asyncio event loop, initializes the PromptServer,
|
|
creates a PromptQueue, and initializes the custom nodes.
|
|
"""
|
|
import asyncio
|
|
import execution
|
|
from nodes import init_custom_nodes
|
|
import server
|
|
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
|
|
server_instance = server.PromptServer(loop)
|
|
execution.PromptQueue(server_instance)
|
|
|
|
|
|
init_custom_nodes()
|
|
|
|
|
|
def resize_image(image_path, output_path='ready.png'):
|
|
with Image.open(image_path) as img:
|
|
|
|
max_size = (1024, 1024)
|
|
try:
|
|
img.thumbnail(max_size, Image.ANTIALIAS)
|
|
except Exception as ex:
|
|
print(ex)
|
|
img.thumbnail(max_size, Image.LANCZOS)
|
|
|
|
img.save(output_path)
|
|
return output_path
|
|
|
|
|
|
def calculate_new_size(original_width, original_height, target_ratio):
|
|
|
|
target_width_ratio, target_height_ratio = map(int, target_ratio.split(':'))
|
|
|
|
|
|
original_ratio = original_width / original_height
|
|
target_ratio_value = target_width_ratio / target_height_ratio
|
|
|
|
|
|
if original_ratio > target_ratio_value:
|
|
|
|
new_width = original_width
|
|
new_height = int(original_width / target_ratio_value)
|
|
else:
|
|
|
|
new_height = original_height
|
|
new_width = int(original_height * target_ratio_value)
|
|
|
|
return new_width, new_height
|
|
|
|
|
|
from nodes import (
|
|
LoadImage,
|
|
CLIPTextEncode,
|
|
VAEDecode,
|
|
InpaintModelConditioning,
|
|
UNETLoader,
|
|
VAELoader,
|
|
DualCLIPLoader,
|
|
NODE_CLASS_MAPPINGS,
|
|
SaveImage
|
|
)
|
|
|
|
|
|
def main(
|
|
image_path: str,
|
|
prompt: str = '',
|
|
negative_prompt: str = '',
|
|
mask_expand: int = 30,
|
|
gaus_kernel_size: int = 100,
|
|
gaus_sigma: int = 100,
|
|
steps: int = 20,
|
|
denoise: float = 1,
|
|
cfg: float = 3,
|
|
target_ratio: str = '1:1',
|
|
quantity: int = 1
|
|
|
|
):
|
|
import_custom_nodes()
|
|
with torch.inference_mode():
|
|
image_path = resize_image(image_path)
|
|
image = Image.open(image_path)
|
|
w, h = image.size
|
|
new_w, new_h = calculate_new_size(w, h, target_ratio)
|
|
|
|
diff_w = int(max(w, new_w) - min(w, new_w))
|
|
diff_h = int(max(w, new_h) - min(h, new_h))
|
|
|
|
add_w = int(diff_w // 2 if diff_w > 0 else 0)
|
|
add_h = int(diff_h // 2 if diff_h > 0 else 0)
|
|
|
|
print('===================================')
|
|
print((new_w, new_h))
|
|
print((add_w, add_h))
|
|
print('===================================')
|
|
|
|
randomnoise = NODE_CLASS_MAPPINGS["RandomNoise"]()
|
|
randomnoise_4 = randomnoise.get_noise(noise_seed=random.randint(1, 2**64))
|
|
|
|
ksamplerselect = NODE_CLASS_MAPPINGS["KSamplerSelect"]()
|
|
ksamplerselect_7 = ksamplerselect.get_sampler(sampler_name="euler")
|
|
|
|
dualcliploader = DualCLIPLoader()
|
|
dualcliploader_16 = dualcliploader.load_clip(
|
|
clip_name1="t5xxl_fp8_e4m3fn.safetensors",
|
|
clip_name2="clip_l.safetensors",
|
|
type="flux",
|
|
)
|
|
|
|
cliptextencode = CLIPTextEncode()
|
|
cliptextencode_15 = cliptextencode.encode(
|
|
text=prompt, clip=get_value_at_index(dualcliploader_16, 0)
|
|
)
|
|
|
|
fluxguidance = NODE_CLASS_MAPPINGS["FluxGuidance"]()
|
|
fluxguidance_10 = fluxguidance.append(
|
|
guidance=cfg, conditioning=get_value_at_index(cliptextencode_15, 0)
|
|
)
|
|
|
|
cliptextencode_11 = cliptextencode.encode(
|
|
text=negative_prompt, clip=get_value_at_index(dualcliploader_16, 0)
|
|
)
|
|
|
|
vaeloader = VAELoader()
|
|
vaeloader_17 = vaeloader.load_vae(vae_name="ae.sft")
|
|
|
|
loadimage = LoadImage()
|
|
loadimage_21 = loadimage.load_image(image=image_path)
|
|
|
|
inpaintextendoutpaint = NODE_CLASS_MAPPINGS["InpaintExtendOutpaint"]()
|
|
inpaintextendoutpaint_25 = inpaintextendoutpaint.inpaint_extend(
|
|
mode="pixels",
|
|
expand_up_pixels=add_h,
|
|
expand_up_factor=1,
|
|
expand_down_pixels=add_h,
|
|
expand_down_factor=1,
|
|
expand_left_pixels=add_w,
|
|
expand_left_factor=1,
|
|
expand_right_pixels=add_w,
|
|
expand_right_factor=1,
|
|
image=get_value_at_index(loadimage_21, 0),
|
|
mask=get_value_at_index(loadimage_21, 1),
|
|
)
|
|
|
|
growmask = NODE_CLASS_MAPPINGS["GrowMask"]()
|
|
growmask_26 = growmask.expand_mask(
|
|
expand=mask_expand,
|
|
tapered_corners=True,
|
|
mask=get_value_at_index(inpaintextendoutpaint_25, 1),
|
|
)
|
|
|
|
impactgaussianblurmask = NODE_CLASS_MAPPINGS["ImpactGaussianBlurMask"]()
|
|
impactgaussianblurmask_27 = impactgaussianblurmask.doit(
|
|
kernel_size=gaus_kernel_size, sigma=gaus_sigma,
|
|
mask=get_value_at_index(growmask_26, 0)
|
|
)
|
|
|
|
inpaintmodelconditioning = InpaintModelConditioning()
|
|
inpaintmodelconditioning_9 = inpaintmodelconditioning.encode(
|
|
positive=get_value_at_index(fluxguidance_10, 0),
|
|
negative=get_value_at_index(cliptextencode_11, 0),
|
|
vae=get_value_at_index(vaeloader_17, 0),
|
|
pixels=get_value_at_index(inpaintextendoutpaint_25, 0),
|
|
mask=get_value_at_index(impactgaussianblurmask_27, 0),
|
|
)
|
|
|
|
unetloader = UNETLoader()
|
|
unetloader_19 = unetloader.load_unet(
|
|
unet_name="flux1-dev.sft", weight_dtype="fp8_e4m3fn"
|
|
)
|
|
|
|
differentialdiffusion = NODE_CLASS_MAPPINGS["DifferentialDiffusion"]()
|
|
basicguider = NODE_CLASS_MAPPINGS["BasicGuider"]()
|
|
basicscheduler = NODE_CLASS_MAPPINGS["BasicScheduler"]()
|
|
samplercustomadvanced = NODE_CLASS_MAPPINGS["SamplerCustomAdvanced"]()
|
|
vaedecode = VAEDecode()
|
|
result = []
|
|
saveimage = SaveImage()
|
|
for q in range(quantity):
|
|
differentialdiffusion_6 = differentialdiffusion.apply(
|
|
model=get_value_at_index(unetloader_19, 0)
|
|
)
|
|
|
|
basicguider_5 = basicguider.get_guider(
|
|
model=get_value_at_index(differentialdiffusion_6, 0),
|
|
conditioning=get_value_at_index(inpaintmodelconditioning_9, 0),
|
|
)
|
|
|
|
basicscheduler_8 = basicscheduler.get_sigmas(
|
|
scheduler="simple",
|
|
steps=steps,
|
|
denoise=denoise,
|
|
model=get_value_at_index(unetloader_19, 0),
|
|
)
|
|
|
|
samplercustomadvanced_1 = samplercustomadvanced.sample(
|
|
noise=get_value_at_index(randomnoise_4, 0),
|
|
guider=get_value_at_index(basicguider_5, 0),
|
|
sampler=get_value_at_index(ksamplerselect_7, 0),
|
|
sigmas=get_value_at_index(basicscheduler_8, 0),
|
|
latent_image=get_value_at_index(inpaintmodelconditioning_9, 2),
|
|
)
|
|
|
|
vaedecode_2 = vaedecode.decode(
|
|
samples=get_value_at_index(samplercustomadvanced_1, 0),
|
|
vae=get_value_at_index(vaeloader_17, 0),
|
|
)
|
|
saveimage_207 = saveimage.save_images(
|
|
filename_prefix="ComfyUI",
|
|
images=get_value_at_index(vaedecode_2, 0)
|
|
)
|
|
print(saveimage_207)
|
|
filename = saveimage_207['ui']['images'][0]['filename']
|
|
result.append(f'output/{filename}')
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|