daquanzhou
merge github repos and lfs track ckpt/path/safetensors/pt
613c9ab
raw
history blame
25.5 kB
import math
import impact.core as core
from impact.utils import *
from nodes import MAX_RESOLUTION
import nodes
from impact.impact_sampling import KSamplerWrapper, KSamplerAdvancedWrapper
class TiledKSamplerProvider:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"tile_width": ("INT", {"default": 512, "min": 320, "max": MAX_RESOLUTION, "step": 64}),
"tile_height": ("INT", {"default": 512, "min": 320, "max": MAX_RESOLUTION, "step": 64}),
"tiling_strategy": (["random", "padded", 'simple'], ),
"basic_pipe": ("BASIC_PIPE", )
}}
RETURN_TYPES = ("KSAMPLER",)
FUNCTION = "doit"
CATEGORY = "ImpactPack/Sampler"
def doit(self, seed, steps, cfg, sampler_name, scheduler, denoise,
tile_width, tile_height, tiling_strategy, basic_pipe):
model, _, _, positive, negative = basic_pipe
sampler = core.TiledKSamplerWrapper(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise,
tile_width, tile_height, tiling_strategy)
return (sampler, )
class KSamplerProvider:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"basic_pipe": ("BASIC_PIPE", )
},
}
RETURN_TYPES = ("KSAMPLER",)
FUNCTION = "doit"
CATEGORY = "ImpactPack/Sampler"
def doit(self, seed, steps, cfg, sampler_name, scheduler, denoise, basic_pipe):
model, _, _, positive, negative = basic_pipe
sampler = KSamplerWrapper(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, denoise)
return (sampler, )
class KSamplerAdvancedProvider:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ),
"sigma_factor": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
"basic_pipe": ("BASIC_PIPE", )
},
"optional": {
"sampler_opt": ("SAMPLER", )
}
}
RETURN_TYPES = ("KSAMPLER_ADVANCED",)
FUNCTION = "doit"
CATEGORY = "ImpactPack/Sampler"
def doit(self, cfg, sampler_name, scheduler, basic_pipe, sigma_factor=1.0, sampler_opt=None):
model, _, _, positive, negative = basic_pipe
sampler = KSamplerAdvancedWrapper(model, cfg, sampler_name, scheduler, positive, negative, sampler_opt=sampler_opt, sigma_factor=sigma_factor)
return (sampler, )
class TwoSamplersForMask:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"latent_image": ("LATENT", ),
"base_sampler": ("KSAMPLER", ),
"mask_sampler": ("KSAMPLER", ),
"mask": ("MASK", )
},
}
RETURN_TYPES = ("LATENT", )
FUNCTION = "doit"
CATEGORY = "ImpactPack/Sampler"
def doit(self, latent_image, base_sampler, mask_sampler, mask):
inv_mask = torch.where(mask != 1.0, torch.tensor(1.0), torch.tensor(0.0))
latent_image['noise_mask'] = inv_mask
new_latent_image = base_sampler.sample(latent_image)
new_latent_image['noise_mask'] = mask
new_latent_image = mask_sampler.sample(new_latent_image)
del new_latent_image['noise_mask']
return (new_latent_image, )
class TwoAdvancedSamplersForMask:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"samples": ("LATENT", ),
"base_sampler": ("KSAMPLER_ADVANCED", ),
"mask_sampler": ("KSAMPLER_ADVANCED", ),
"mask": ("MASK", ),
"overlap_factor": ("INT", {"default": 10, "min": 0, "max": 10000})
},
}
RETURN_TYPES = ("LATENT", )
FUNCTION = "doit"
CATEGORY = "ImpactPack/Sampler"
@staticmethod
def mask_erosion(samples, mask, grow_mask_by):
mask = mask.clone()
w = samples['samples'].shape[3]
h = samples['samples'].shape[2]
mask2 = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(w, h), mode="bilinear")
if grow_mask_by == 0:
mask_erosion = mask2
else:
kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by))
padding = math.ceil((grow_mask_by - 1) / 2)
mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask2.round(), kernel_tensor, padding=padding), 0, 1)
return mask_erosion[:, :, :w, :h].round()
def doit(self, seed, steps, denoise, samples, base_sampler, mask_sampler, mask, overlap_factor):
inv_mask = torch.where(mask != 1.0, torch.tensor(1.0), torch.tensor(0.0))
adv_steps = int(steps / denoise)
start_at_step = adv_steps - steps
new_latent_image = samples.copy()
mask_erosion = TwoAdvancedSamplersForMask.mask_erosion(samples, mask, overlap_factor)
for i in range(start_at_step, adv_steps):
add_noise = "enable" if i == start_at_step else "disable"
return_with_leftover_noise = "enable" if i+1 != adv_steps else "disable"
new_latent_image['noise_mask'] = inv_mask
new_latent_image = base_sampler.sample_advanced(add_noise, seed, adv_steps, new_latent_image, i, i + 1, "enable", recovery_mode="ratio additional")
new_latent_image['noise_mask'] = mask_erosion
new_latent_image = mask_sampler.sample_advanced("disable", seed, adv_steps, new_latent_image, i, i + 1, return_with_leftover_noise, recovery_mode="ratio additional")
del new_latent_image['noise_mask']
return (new_latent_image, )
class RegionalPrompt:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"mask": ("MASK", ),
"advanced_sampler": ("KSAMPLER_ADVANCED", ),
},
}
RETURN_TYPES = ("REGIONAL_PROMPTS", )
FUNCTION = "doit"
CATEGORY = "ImpactPack/Regional"
def doit(self, mask, advanced_sampler):
regional_prompt = core.REGIONAL_PROMPT(mask, advanced_sampler)
return ([regional_prompt], )
class CombineRegionalPrompts:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"regional_prompts1": ("REGIONAL_PROMPTS", ),
},
}
RETURN_TYPES = ("REGIONAL_PROMPTS", )
FUNCTION = "doit"
CATEGORY = "ImpactPack/Regional"
def doit(self, **kwargs):
res = []
for k, v in kwargs.items():
res += v
return (res, )
class CombineConditionings:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"conditioning1": ("CONDITIONING", ),
},
}
RETURN_TYPES = ("CONDITIONING", )
FUNCTION = "doit"
CATEGORY = "ImpactPack/Util"
def doit(self, **kwargs):
res = []
for k, v in kwargs.items():
res += v
return (res, )
class ConcatConditionings:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"conditioning1": ("CONDITIONING", ),
},
}
RETURN_TYPES = ("CONDITIONING", )
FUNCTION = "doit"
CATEGORY = "ImpactPack/Util"
def doit(self, **kwargs):
conditioning_to = list(kwargs.values())[0]
for k, conditioning_from in list(kwargs.items())[1:]:
out = []
if len(conditioning_from) > 1:
print("Warning: ConcatConditionings {k} contains more than 1 cond, only the first one will actually be applied to conditioning1.")
cond_from = conditioning_from[0][0]
for i in range(len(conditioning_to)):
t1 = conditioning_to[i][0]
tw = torch.cat((t1, cond_from), 1)
n = [tw, conditioning_to[i][1].copy()]
out.append(n)
conditioning_to = out
return (out, )
class RegionalSampler:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"seed_2nd": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"seed_2nd_mode": (["ignore", "fixed", "seed+seed_2nd", "seed-seed_2nd", "increment", "decrement", "randomize"], ),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"base_only_steps": ("INT", {"default": 2, "min": 0, "max": 10000}),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
"samples": ("LATENT", ),
"base_sampler": ("KSAMPLER_ADVANCED", ),
"regional_prompts": ("REGIONAL_PROMPTS", ),
"overlap_factor": ("INT", {"default": 10, "min": 0, "max": 10000}),
"restore_latent": ("BOOLEAN", {"default": True, "label_on": "enabled", "label_off": "disabled"}),
"additional_mode": (["DISABLE", "ratio additional", "ratio between"], {"default": "ratio between"}),
"additional_sampler": (["AUTO", "euler", "heun", "heunpp2", "dpm_2", "dpm_fast", "dpmpp_2m", "ddpm"],),
"additional_sigma_ratio": ("FLOAT", {"default": 0.3, "min": 0.0, "max": 1.0, "step": 0.01}),
},
"hidden": {"unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("LATENT", )
FUNCTION = "doit"
CATEGORY = "ImpactPack/Regional"
@staticmethod
def mask_erosion(samples, mask, grow_mask_by):
mask = mask.clone()
w = samples['samples'].shape[3]
h = samples['samples'].shape[2]
mask2 = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(w, h), mode="bilinear")
if grow_mask_by == 0:
mask_erosion = mask2
else:
kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by))
padding = math.ceil((grow_mask_by - 1) / 2)
mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask2.round(), kernel_tensor, padding=padding), 0, 1)
return mask_erosion[:, :, :w, :h].round()
def doit(self, seed, seed_2nd, seed_2nd_mode, steps, base_only_steps, denoise, samples, base_sampler, regional_prompts, overlap_factor, restore_latent,
additional_mode, additional_sampler, additional_sigma_ratio, unique_id=None):
if restore_latent:
latent_compositor = nodes.NODE_CLASS_MAPPINGS['LatentCompositeMasked']()
else:
latent_compositor = None
masks = [regional_prompt.mask.numpy() for regional_prompt in regional_prompts]
masks = [np.ceil(mask).astype(np.int32) for mask in masks]
combined_mask = torch.from_numpy(np.bitwise_or.reduce(masks))
inv_mask = torch.where(combined_mask == 0, torch.tensor(1.0), torch.tensor(0.0))
adv_steps = int(steps / denoise)
start_at_step = adv_steps - steps
region_len = len(regional_prompts)
total = steps*region_len
leftover_noise = False
if base_only_steps > 0:
if seed_2nd_mode == 'ignore':
leftover_noise = True
samples = base_sampler.sample_advanced(True, seed, adv_steps, samples, start_at_step, start_at_step + base_only_steps, leftover_noise, recovery_mode="DISABLE")
if seed_2nd_mode == "seed+seed_2nd":
seed += seed_2nd
if seed > 1125899906842624:
seed = seed - 1125899906842624
elif seed_2nd_mode == "seed-seed_2nd":
seed -= seed_2nd
if seed < 0:
seed += 1125899906842624
elif seed_2nd_mode != 'ignore':
seed = seed_2nd
new_latent_image = samples.copy()
base_latent_image = None
if not leftover_noise:
add_noise = True
else:
add_noise = False
for i in range(start_at_step+base_only_steps, adv_steps):
core.update_node_status(unique_id, f"{i}/{steps} steps | ", ((i-start_at_step)*region_len)/total)
new_latent_image['noise_mask'] = inv_mask
new_latent_image = base_sampler.sample_advanced(add_noise, seed, adv_steps, new_latent_image, i, i + 1, True,
recovery_mode=additional_mode, recovery_sampler=additional_sampler, recovery_sigma_ratio=additional_sigma_ratio)
if restore_latent:
if 'noise_mask' in new_latent_image:
del new_latent_image['noise_mask']
base_latent_image = new_latent_image.copy()
j = 1
for regional_prompt in regional_prompts:
if restore_latent:
new_latent_image = base_latent_image.copy()
core.update_node_status(unique_id, f"{i}/{steps} steps | {j}/{region_len}", ((i-start_at_step)*region_len + j)/total)
region_mask = regional_prompt.get_mask_erosion(overlap_factor).squeeze(0).squeeze(0)
new_latent_image['noise_mask'] = region_mask
new_latent_image = regional_prompt.sampler.sample_advanced(False, seed, adv_steps, new_latent_image, i, i + 1, True,
recovery_mode=additional_mode, recovery_sampler=additional_sampler, recovery_sigma_ratio=additional_sigma_ratio)
if restore_latent:
del new_latent_image['noise_mask']
base_latent_image = latent_compositor.composite(base_latent_image, new_latent_image, 0, 0, False, region_mask)[0]
new_latent_image = base_latent_image
j += 1
add_noise = False
# finalize
core.update_node_status(unique_id, f"finalize")
if base_latent_image is not None:
new_latent_image = base_latent_image
else:
base_latent_image = new_latent_image
new_latent_image['noise_mask'] = inv_mask
new_latent_image = base_sampler.sample_advanced(False, seed, adv_steps, new_latent_image, adv_steps, adv_steps+1, False,
recovery_mode=additional_mode, recovery_sampler=additional_sampler, recovery_sigma_ratio=additional_sigma_ratio)
core.update_node_status(unique_id, f"{steps}/{steps} steps", total)
core.update_node_status(unique_id, "", None)
if restore_latent:
new_latent_image = base_latent_image
if 'noise_mask' in new_latent_image:
del new_latent_image['noise_mask']
return (new_latent_image, )
class RegionalSamplerAdvanced:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"add_noise": ("BOOLEAN", {"default": True, "label_on": "enabled", "label_off": "disabled"}),
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}),
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}),
"overlap_factor": ("INT", {"default": 10, "min": 0, "max": 10000}),
"restore_latent": ("BOOLEAN", {"default": True, "label_on": "enabled", "label_off": "disabled"}),
"return_with_leftover_noise": ("BOOLEAN", {"default": False, "label_on": "enabled", "label_off": "disabled"}),
"latent_image": ("LATENT", ),
"base_sampler": ("KSAMPLER_ADVANCED", ),
"regional_prompts": ("REGIONAL_PROMPTS", ),
"additional_mode": (["DISABLE", "ratio additional", "ratio between"], {"default": "ratio between"}),
"additional_sampler": (["AUTO", "euler", "heun", "heunpp2", "dpm_2", "dpm_fast", "dpmpp_2m", "ddpm"],),
"additional_sigma_ratio": ("FLOAT", {"default": 0.3, "min": 0.0, "max": 1.0, "step": 0.01}),
},
"hidden": {"unique_id": "UNIQUE_ID"},
}
RETURN_TYPES = ("LATENT", )
FUNCTION = "doit"
CATEGORY = "ImpactPack/Regional"
def doit(self, add_noise, noise_seed, steps, start_at_step, end_at_step, overlap_factor, restore_latent, return_with_leftover_noise, latent_image, base_sampler, regional_prompts,
additional_mode, additional_sampler, additional_sigma_ratio, unique_id):
if restore_latent:
latent_compositor = nodes.NODE_CLASS_MAPPINGS['LatentCompositeMasked']()
else:
latent_compositor = None
masks = [regional_prompt.mask.numpy() for regional_prompt in regional_prompts]
masks = [np.ceil(mask).astype(np.int32) for mask in masks]
combined_mask = torch.from_numpy(np.bitwise_or.reduce(masks))
inv_mask = torch.where(combined_mask == 0, torch.tensor(1.0), torch.tensor(0.0))
region_len = len(regional_prompts)
end_at_step = min(steps, end_at_step)
total = (end_at_step - start_at_step) * region_len
new_latent_image = latent_image.copy()
base_latent_image = None
region_masks = {}
for i in range(start_at_step, end_at_step-1):
core.update_node_status(unique_id, f"{start_at_step+i}/{end_at_step} steps | ", ((i-start_at_step)*region_len)/total)
cur_add_noise = True if i == start_at_step and add_noise else False
new_latent_image['noise_mask'] = inv_mask
new_latent_image = base_sampler.sample_advanced(cur_add_noise, noise_seed, steps, new_latent_image, i, i + 1, True,
recovery_mode=additional_mode, recovery_sampler=additional_sampler, recovery_sigma_ratio=additional_sigma_ratio)
if restore_latent:
del new_latent_image['noise_mask']
base_latent_image = new_latent_image.copy()
j = 1
for regional_prompt in regional_prompts:
if restore_latent:
new_latent_image = base_latent_image.copy()
core.update_node_status(unique_id, f"{start_at_step+i}/{end_at_step} steps | {j}/{region_len}", ((i-start_at_step)*region_len + j)/total)
if j not in region_masks:
region_mask = regional_prompt.get_mask_erosion(overlap_factor).squeeze(0).squeeze(0)
region_masks[j] = region_mask
else:
region_mask = region_masks[j]
new_latent_image['noise_mask'] = region_mask
new_latent_image = regional_prompt.sampler.sample_advanced(False, noise_seed, steps, new_latent_image, i, i + 1, True,
recovery_mode=additional_mode, recovery_sampler=additional_sampler, recovery_sigma_ratio=additional_sigma_ratio)
if restore_latent:
del new_latent_image['noise_mask']
base_latent_image = latent_compositor.composite(base_latent_image, new_latent_image, 0, 0, False, region_mask)[0]
new_latent_image = base_latent_image
j += 1
# finalize
core.update_node_status(unique_id, f"finalize")
if base_latent_image is not None:
new_latent_image = base_latent_image
else:
base_latent_image = new_latent_image
new_latent_image['noise_mask'] = inv_mask
new_latent_image = base_sampler.sample_advanced(False, noise_seed, steps, new_latent_image, end_at_step-1, end_at_step, return_with_leftover_noise,
recovery_mode=additional_mode, recovery_sampler=additional_sampler, recovery_sigma_ratio=additional_sigma_ratio)
core.update_node_status(unique_id, f"{end_at_step}/{end_at_step} steps", total)
core.update_node_status(unique_id, "", None)
if restore_latent:
new_latent_image = base_latent_image
if 'noise_mask' in new_latent_image:
del new_latent_image['noise_mask']
return (new_latent_image, )
class KSamplerBasicPipe:
@classmethod
def INPUT_TYPES(s):
return {"required":
{"basic_pipe": ("BASIC_PIPE",),
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ),
"latent_image": ("LATENT", ),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
}
}
RETURN_TYPES = ("BASIC_PIPE", "LATENT", "VAE")
FUNCTION = "sample"
CATEGORY = "sampling"
def sample(self, basic_pipe, seed, steps, cfg, sampler_name, scheduler, latent_image, denoise=1.0):
model, clip, vae, positive, negative = basic_pipe
latent = nodes.KSampler().sample(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise)[0]
return basic_pipe, latent, vae
class KSamplerAdvancedBasicPipe:
@classmethod
def INPUT_TYPES(s):
return {"required":
{"basic_pipe": ("BASIC_PIPE",),
"add_noise": ("BOOLEAN", {"default": True, "label_on": "enable", "label_off": "disable"}),
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ),
"latent_image": ("LATENT", ),
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}),
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}),
"return_with_leftover_noise": ("BOOLEAN", {"default": False, "label_on": "enable", "label_off": "disable"}),
}
}
RETURN_TYPES = ("BASIC_PIPE", "LATENT", "VAE")
FUNCTION = "sample"
CATEGORY = "sampling"
def sample(self, basic_pipe, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0):
model, clip, vae, positive, negative = basic_pipe
if add_noise:
add_noise = "enable"
else:
add_noise = "disable"
if return_with_leftover_noise:
return_with_leftover_noise = "enable"
else:
return_with_leftover_noise = "disable"
latent = nodes.KSamplerAdvanced().sample(model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise)[0]
return basic_pipe, latent, vae