DiffuseCraftMod / stablepy_model.py
John6666's picture
Upload 2 files
ee4dce6 verified
import gc, time
import numpy as np
import PIL.Image
from diffusers import (
ControlNetModel,
DiffusionPipeline,
StableDiffusionControlNetPipeline,
StableDiffusionControlNetInpaintPipeline,
StableDiffusionPipeline,
AutoencoderKL,
StableDiffusionXLInpaintPipeline,
StableDiffusionXLAdapterPipeline,
T2IAdapter,
StableDiffusionXLPipeline,
AutoPipelineForImage2Image
)
from huggingface_hub import hf_hub_download
import torch, random, json
from controlnet_aux import (
CannyDetector,
ContentShuffleDetector,
HEDdetector,
LineartAnimeDetector,
LineartDetector,
MidasDetector,
MLSDdetector,
NormalBaeDetector,
OpenposeDetector,
PidiNetDetector,
)
from transformers import pipeline
from controlnet_aux.util import HWC3, ade_palette
from transformers import AutoImageProcessor, UperNetForSemanticSegmentation
import cv2
from diffusers import (
DPMSolverMultistepScheduler,
DPMSolverSinglestepScheduler,
KDPM2DiscreteScheduler,
EulerDiscreteScheduler,
EulerAncestralDiscreteScheduler,
HeunDiscreteScheduler,
LMSDiscreteScheduler,
DDIMScheduler,
DEISMultistepScheduler,
UniPCMultistepScheduler,
LCMScheduler,
PNDMScheduler,
KDPM2AncestralDiscreteScheduler,
EDMDPMSolverMultistepScheduler,
EDMEulerScheduler,
)
from .prompt_weights import get_embed_new, add_comma_after_pattern_ti
from .utils import save_pil_image_with_metadata
from .lora_loader import lora_mix_load
from .inpainting_canvas import draw, make_inpaint_condition
from .adetailer import ad_model_process
from ..upscalers.esrgan import UpscalerESRGAN, UpscalerLanczos, UpscalerNearest
from ..logging.logging_setup import logger
from .extra_model_loaders import custom_task_model_loader
from .high_resolution import process_images_high_resolution
from .style_prompt_config import styles_data, STYLE_NAMES, get_json_content, apply_style
import os
from compel import Compel, ReturnedEmbeddingsType
import ipywidgets as widgets, mediapy
from IPython.display import display
from PIL import Image
from typing import Union, Optional, List, Tuple, Dict, Any, Callable
import logging, diffusers, copy, warnings
logging.getLogger("diffusers").setLevel(logging.ERROR)
#logging.getLogger("transformers").setLevel(logging.ERROR)
diffusers.utils.logging.set_verbosity(40)
warnings.filterwarnings(action="ignore", category=FutureWarning, module="diffusers")
warnings.filterwarnings(action="ignore", category=FutureWarning, module="transformers")
# =====================================
# Utils preprocessor
# =====================================
def resize_image(input_image, resolution, interpolation=None):
H, W, C = input_image.shape
H = float(H)
W = float(W)
k = float(resolution) / max(H, W)
H *= k
W *= k
H = int(np.round(H / 64.0)) * 64
W = int(np.round(W / 64.0)) * 64
if interpolation is None:
interpolation = cv2.INTER_LANCZOS4 if k > 1 else cv2.INTER_AREA
img = cv2.resize(input_image, (W, H), interpolation=interpolation)
return img
class DepthEstimator:
def __init__(self):
self.model = pipeline("depth-estimation")
def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image:
detect_resolution = kwargs.pop("detect_resolution", 512)
image_resolution = kwargs.pop("image_resolution", 512)
image = np.array(image)
image = HWC3(image)
image = resize_image(image, resolution=detect_resolution)
image = PIL.Image.fromarray(image)
image = self.model(image)
image = image["depth"]
image = np.array(image)
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
return PIL.Image.fromarray(image)
class ImageSegmentor:
def __init__(self):
self.image_processor = AutoImageProcessor.from_pretrained(
"openmmlab/upernet-convnext-small"
)
self.image_segmentor = UperNetForSemanticSegmentation.from_pretrained(
"openmmlab/upernet-convnext-small"
)
@torch.inference_mode()
def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image:
detect_resolution = kwargs.pop("detect_resolution", 512)
image_resolution = kwargs.pop("image_resolution", 512)
image = HWC3(image)
image = resize_image(image, resolution=detect_resolution)
image = PIL.Image.fromarray(image)
pixel_values = self.image_processor(image, return_tensors="pt").pixel_values
outputs = self.image_segmentor(pixel_values)
seg = self.image_processor.post_process_semantic_segmentation(
outputs, target_sizes=[image.size[::-1]]
)[0]
color_seg = np.zeros((seg.shape[0], seg.shape[1], 3), dtype=np.uint8)
for label, color in enumerate(ade_palette()):
color_seg[seg == label, :] = color
color_seg = color_seg.astype(np.uint8)
color_seg = resize_image(
color_seg, resolution=image_resolution, interpolation=cv2.INTER_NEAREST
)
return PIL.Image.fromarray(color_seg)
class Preprocessor:
MODEL_ID = "lllyasviel/Annotators"
def __init__(self):
self.model = None
self.name = ""
def load(self, name: str) -> None:
if name == self.name:
return
if name == "HED":
self.model = HEDdetector.from_pretrained(self.MODEL_ID)
elif name == "Midas":
self.model = MidasDetector.from_pretrained(self.MODEL_ID)
elif name == "MLSD":
self.model = MLSDdetector.from_pretrained(self.MODEL_ID)
elif name == "Openpose":
self.model = OpenposeDetector.from_pretrained(self.MODEL_ID)
elif name == "PidiNet":
self.model = PidiNetDetector.from_pretrained(self.MODEL_ID)
elif name == "NormalBae":
self.model = NormalBaeDetector.from_pretrained(self.MODEL_ID)
elif name == "Lineart":
self.model = LineartDetector.from_pretrained(self.MODEL_ID)
elif name == "LineartAnime":
self.model = LineartAnimeDetector.from_pretrained(self.MODEL_ID)
elif name == "Canny":
self.model = CannyDetector()
elif name == "ContentShuffle":
self.model = ContentShuffleDetector()
elif name == "DPT":
self.model = DepthEstimator()
elif name == "UPerNet":
self.model = ImageSegmentor()
else:
raise ValueError
torch.cuda.empty_cache()
gc.collect()
self.name = name
def __call__(self, image: PIL.Image.Image, **kwargs) -> PIL.Image.Image:
if self.name == "Canny":
if "detect_resolution" in kwargs:
detect_resolution = kwargs.pop("detect_resolution")
image = np.array(image)
image = HWC3(image)
image = resize_image(image, resolution=detect_resolution)
image = self.model(image, **kwargs)
return PIL.Image.fromarray(image)
elif self.name == "Midas":
detect_resolution = kwargs.pop("detect_resolution", 512)
image_resolution = kwargs.pop("image_resolution", 512)
image = np.array(image)
image = HWC3(image)
image = resize_image(image, resolution=detect_resolution)
image = self.model(image, **kwargs)
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
return PIL.Image.fromarray(image)
else:
return self.model(image, **kwargs)
# =====================================
# Base Model
# =====================================
CONTROLNET_MODEL_IDS = {
"openpose": "lllyasviel/control_v11p_sd15_openpose",
"canny": "lllyasviel/control_v11p_sd15_canny",
"mlsd": "lllyasviel/control_v11p_sd15_mlsd",
"scribble": "lllyasviel/control_v11p_sd15_scribble",
"softedge": "lllyasviel/control_v11p_sd15_softedge",
"segmentation": "lllyasviel/control_v11p_sd15_seg",
"depth": "lllyasviel/control_v11f1p_sd15_depth",
"normalbae": "lllyasviel/control_v11p_sd15_normalbae",
"lineart": "lllyasviel/control_v11p_sd15_lineart",
"lineart_anime": "lllyasviel/control_v11p_sd15s2_lineart_anime",
"shuffle": "lllyasviel/control_v11e_sd15_shuffle",
"ip2p": "lllyasviel/control_v11e_sd15_ip2p",
"inpaint": "lllyasviel/control_v11p_sd15_inpaint",
"txt2img": "Nothinghere",
"sdxl_canny": "TencentARC/t2i-adapter-canny-sdxl-1.0",
"sdxl_sketch": "TencentARC/t2i-adapter-sketch-sdxl-1.0",
"sdxl_lineart": "TencentARC/t2i-adapter-lineart-sdxl-1.0",
"sdxl_depth-midas": "TencentARC/t2i-adapter-depth-midas-sdxl-1.0",
"sdxl_openpose": "TencentARC/t2i-adapter-openpose-sdxl-1.0",
#"sdxl_depth-zoe": "TencentARC/t2i-adapter-depth-zoe-sdxl-1.0",
#"sdxl_recolor": "TencentARC/t2i-adapter-recolor-sdxl-1.0",
"img2img": "Nothinghere",
}
# def download_all_controlnet_weights() -> None:
# for model_id in CONTROLNET_MODEL_IDS.values():
# ControlNetModel.from_pretrained(model_id)
SCHEDULER_CONFIG_MAP = {
"DPM++ 2M": (DPMSolverMultistepScheduler, {}),
"DPM++ 2M Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True}),
"DPM++ 2M SDE": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++"}),
"DPM++ 2M SDE Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True, "algorithm_type": "sde-dpmsolver++"}),
"DPM++ SDE": (DPMSolverSinglestepScheduler, {}),
"DPM++ SDE Karras": (DPMSolverSinglestepScheduler, {"use_karras_sigmas": True}),
"DPM2": (KDPM2DiscreteScheduler, {}),
"DPM2 Karras": (KDPM2DiscreteScheduler, {"use_karras_sigmas": True}),
"DPM2 a" : (KDPM2AncestralDiscreteScheduler, {}),
"DPM2 a Karras" : (KDPM2AncestralDiscreteScheduler, {"use_karras_sigmas": True}),
"Euler": (EulerDiscreteScheduler, {}),
"Euler a": (EulerAncestralDiscreteScheduler, {}),
"Heun": (HeunDiscreteScheduler, {}),
"LMS": (LMSDiscreteScheduler, {}),
"LMS Karras": (LMSDiscreteScheduler, {"use_karras_sigmas": True}),
"DDIM": (DDIMScheduler, {}),
"DEIS": (DEISMultistepScheduler, {}),
"UniPC": (UniPCMultistepScheduler, {}),
"PNDM" : (PNDMScheduler, {}),
"DPM++ 2M Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True}),
"DPM++ 2M Ef": (DPMSolverMultistepScheduler, {"euler_at_final": True}),
"DPM++ 2M SDE Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True, "algorithm_type": "sde-dpmsolver++"}),
"DPM++ 2M SDE Ef": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++", "euler_at_final": True}),
"EDMDPM": (EDMDPMSolverMultistepScheduler, {}),
"EDMEuler": (EDMEulerScheduler, {}),
"LCM" : (LCMScheduler, {}),
}
scheduler_names = list(SCHEDULER_CONFIG_MAP.keys())
def process_prompts_valid(specific_prompt, specific_negative_prompt, prompt, negative_prompt):
specific_prompt_empty = (specific_prompt in [None, ""])
specific_negative_prompt_empty = (specific_negative_prompt in [None, ""])
prompt_valid = prompt if specific_prompt_empty else specific_prompt
negative_prompt_valid = negative_prompt if specific_negative_prompt_empty else specific_negative_prompt
return specific_prompt_empty, specific_negative_prompt_empty, prompt_valid, negative_prompt_valid
class Model_Diffusers:
def __init__(
self,
base_model_id: str = "runwayml/stable-diffusion-v1-5",
task_name: str = "txt2img",
vae_model=None,
type_model_precision=torch.float16,
sdxl_safetensors = False,
):
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.base_model_id = ""
self.task_name = ""
self.vae_model = None
self.type_model_precision = (
type_model_precision if torch.cuda.is_available() else torch.float32
) # For SD 1.5
self.load_pipe(
base_model_id, task_name, vae_model, type_model_precision, sdxl_safetensors = sdxl_safetensors
)
self.preprocessor = Preprocessor()
self.styles_data = styles_data
self.STYLE_NAMES = STYLE_NAMES
self.style_json_file = ""
def load_pipe(
self,
base_model_id: str,
task_name="txt2img",
vae_model=None,
type_model_precision=torch.float16,
reload=False,
sdxl_safetensors = False,
retain_model_in_memory = True,
) -> DiffusionPipeline:
if (
base_model_id == self.base_model_id
and task_name == self.task_name
and hasattr(self, "pipe")
and self.vae_model == vae_model
and self.pipe is not None
and reload == False
):
if self.type_model_precision == type_model_precision or self.device.type == "cpu":
return
if hasattr(self, "pipe") and os.path.isfile(base_model_id):
unload_model = False
if self.pipe == None:
unload_model = True
elif type_model_precision != self.type_model_precision and self.device.type != "cpu":
unload_model = True
else:
if hasattr(self, "pipe"):
unload_model = False
if self.pipe == None:
unload_model = True
else:
unload_model = True
self.type_model_precision = (
type_model_precision if torch.cuda.is_available() else torch.float32
)
if self.type_model_precision == torch.float32 and os.path.isfile(base_model_id):
logger.info(f"Working with full precision {str(self.type_model_precision)}")
# Load model
if self.base_model_id == base_model_id and self.pipe is not None and reload == False and self.vae_model == vae_model and unload_model == False:
#logger.info("Previous loaded base model") # not return
class_name = self.class_name
else:
# Unload previous model and stuffs
self.pipe = None
self.model_memory = {}
self.lora_memory = [None, None, None, None, None]
self.lora_scale_memory = [1.0, 1.0, 1.0, 1.0, 1.0]
self.LCMconfig = None
self.embed_loaded = []
self.FreeU = False
torch.cuda.empty_cache()
gc.collect()
# Load new model
if os.path.isfile(base_model_id): # exists or not same # if os.path.exists(base_model_id):
if sdxl_safetensors:
logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix")
self.pipe = StableDiffusionXLPipeline.from_single_file(
base_model_id,
vae=AutoencoderKL.from_pretrained(
"madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16
),
torch_dtype=self.type_model_precision,
)
class_name = "StableDiffusionXLPipeline"
else:
self.pipe = StableDiffusionPipeline.from_single_file(
base_model_id,
# vae=None
# if vae_model == None
# else AutoencoderKL.from_single_file(
# vae_model
# ),
torch_dtype=self.type_model_precision,
)
class_name = "StableDiffusionPipeline"
else:
file_config = hf_hub_download(repo_id=base_model_id, filename="model_index.json")
# Reading data from the JSON file
with open(file_config, 'r') as json_config:
data_config = json.load(json_config)
# Searching for the value of the "_class_name" key
if '_class_name' in data_config:
class_name = data_config['_class_name']
match class_name:
case "StableDiffusionPipeline":
self.pipe = StableDiffusionPipeline.from_pretrained(
base_model_id,
torch_dtype=self.type_model_precision,
)
case "StableDiffusionXLPipeline":
logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix")
try:
self.pipe = DiffusionPipeline.from_pretrained(
base_model_id,
vae=AutoencoderKL.from_pretrained(
"madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16
),
torch_dtype=torch.float16,
use_safetensors=True,
variant="fp16",
add_watermarker=False,
)
except Exception as e:
logger.debug(e)
logger.debug("Loading model without parameter variant=fp16")
self.pipe = DiffusionPipeline.from_pretrained(
base_model_id,
vae=AutoencoderKL.from_pretrained(
"madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16
),
torch_dtype=torch.float16,
use_safetensors=True,
add_watermarker=False,
)
self.base_model_id = base_model_id
self.class_name = class_name
# Load VAE after loaded model
if vae_model is None :
logger.debug("Default VAE")
pass
else:
if os.path.isfile(vae_model):
self.pipe.vae = AutoencoderKL.from_single_file(
vae_model
)
else:
self.pipe.vae = AutoencoderKL.from_pretrained(
vae_model,
subfolder = "vae",
)
try:
self.pipe.vae.to(self.type_model_precision)
except:
logger.warning(f"VAE: not in {self.type_model_precision}")
self.vae_model = vae_model
# Define base scheduler
self.default_scheduler = copy.deepcopy(self.pipe.scheduler)
logger.debug(f"Base sampler: {self.default_scheduler}")
if task_name in self.model_memory:
self.pipe = self.model_memory[task_name]
# Create new base values
#self.pipe.to(self.device)
# torch.cuda.empty_cache()
# gc.collect()
self.base_model_id = base_model_id
self.task_name = task_name
self.vae_model = vae_model
self.class_name = class_name
self.pipe.watermark = None
return
# Load task
model_id = CONTROLNET_MODEL_IDS[task_name]
if task_name == "inpaint":
match class_name:
case "StableDiffusionPipeline":
controlnet = ControlNetModel.from_pretrained(
model_id, torch_dtype=self.type_model_precision
)
self.pipe = StableDiffusionControlNetInpaintPipeline(
vae=self.pipe.vae,
text_encoder=self.pipe.text_encoder,
tokenizer=self.pipe.tokenizer,
unet=self.pipe.unet,
controlnet=controlnet,
scheduler=self.pipe.scheduler,
safety_checker=self.pipe.safety_checker,
feature_extractor=self.pipe.feature_extractor,
requires_safety_checker=self.pipe.config.requires_safety_checker,
)
case "StableDiffusionXLPipeline":
self.pipe = StableDiffusionXLInpaintPipeline(
vae=self.pipe.vae,
text_encoder=self.pipe.text_encoder,
text_encoder_2=self.pipe.text_encoder_2,
tokenizer=self.pipe.tokenizer,
tokenizer_2=self.pipe.tokenizer_2,
unet=self.pipe.unet,
# controlnet=self.controlnet,
scheduler=self.pipe.scheduler,
)
if task_name not in ["txt2img", "inpaint", "img2img"]:
match class_name:
case "StableDiffusionPipeline":
controlnet = ControlNetModel.from_pretrained(
model_id, torch_dtype=self.type_model_precision
)
self.pipe = StableDiffusionControlNetPipeline(
vae=self.pipe.vae,
text_encoder=self.pipe.text_encoder,
tokenizer=self.pipe.tokenizer,
unet=self.pipe.unet,
controlnet=controlnet,
scheduler=self.pipe.scheduler,
safety_checker=self.pipe.safety_checker,
feature_extractor=self.pipe.feature_extractor,
requires_safety_checker=self.pipe.config.requires_safety_checker,
)
self.pipe.scheduler = UniPCMultistepScheduler.from_config(self.pipe.scheduler.config)
case "StableDiffusionXLPipeline":
adapter = T2IAdapter.from_pretrained(
model_id,
torch_dtype=torch.float16,
varient="fp16",
).to(self.device)
self.pipe = StableDiffusionXLAdapterPipeline(
vae=self.pipe.vae,
text_encoder=self.pipe.text_encoder,
text_encoder_2=self.pipe.text_encoder_2,
tokenizer=self.pipe.tokenizer,
tokenizer_2=self.pipe.tokenizer_2,
unet=self.pipe.unet,
adapter=adapter,
scheduler=self.pipe.scheduler,
).to(self.device)
if task_name in ["txt2img", "img2img"]:
match class_name:
case "StableDiffusionPipeline":
self.pipe = StableDiffusionPipeline(
vae=self.pipe.vae,
text_encoder=self.pipe.text_encoder,
tokenizer=self.pipe.tokenizer,
unet=self.pipe.unet,
scheduler=self.pipe.scheduler,
safety_checker=self.pipe.safety_checker,
feature_extractor=self.pipe.feature_extractor,
requires_safety_checker=self.pipe.config.requires_safety_checker,
)
case "StableDiffusionXLPipeline":
self.pipe = StableDiffusionXLPipeline(
vae=self.pipe.vae,
text_encoder=self.pipe.text_encoder,
text_encoder_2=self.pipe.text_encoder_2,
tokenizer=self.pipe.tokenizer,
tokenizer_2=self.pipe.tokenizer_2,
unet=self.pipe.unet,
scheduler=self.pipe.scheduler,
)
if task_name == "img2img":
self.pipe = AutoPipelineForImage2Image.from_pipe(self.pipe)
# Create new base values
self.pipe.to(self.device)
torch.cuda.empty_cache()
gc.collect()
self.base_model_id = base_model_id
self.task_name = task_name
self.vae_model = vae_model
self.class_name = class_name
if self.class_name == "StableDiffusionXLPipeline":
self.pipe.enable_vae_slicing()
self.pipe.enable_vae_tiling()
self.pipe.watermark = None
if retain_model_in_memory == True and task_name not in self.model_memory:
self.model_memory[task_name] = self.pipe
return
def load_controlnet_weight(self, task_name: str) -> None:
torch.cuda.empty_cache()
gc.collect()
model_id = CONTROLNET_MODEL_IDS[task_name]
controlnet = ControlNetModel.from_pretrained(
model_id, torch_dtype=self.type_model_precision
)
controlnet.to(self.device)
torch.cuda.empty_cache()
gc.collect()
self.pipe.controlnet = controlnet
#self.task_name = task_name
@torch.autocast("cuda")
def run_pipe(
self,
prompt: str,
negative_prompt: str,
prompt_embeds,
negative_prompt_embeds,
control_image: PIL.Image.Image,
num_images: int,
num_steps: int,
guidance_scale: float,
clip_skip: int,
generator,
controlnet_conditioning_scale,
control_guidance_start,
control_guidance_end,
) -> list[PIL.Image.Image]:
# Return PIL images
# generator = torch.Generator().manual_seed(seed)
return self.pipe(
prompt=prompt,
negative_prompt=negative_prompt,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
guidance_scale=guidance_scale,
clip_skip=clip_skip,
num_images_per_prompt=num_images,
num_inference_steps=num_steps,
generator=generator,
controlnet_conditioning_scale=controlnet_conditioning_scale,
control_guidance_start=control_guidance_start,
control_guidance_end=control_guidance_end,
image=control_image,
).images
@torch.autocast("cuda")
def run_pipe_SD(
self,
prompt: str,
negative_prompt: str,
prompt_embeds,
negative_prompt_embeds,
num_images: int,
num_steps: int,
guidance_scale: float,
clip_skip: int,
height: int,
width: int,
generator,
) -> list[PIL.Image.Image]:
# Return PIL images
# generator = torch.Generator().manual_seed(seed)
self.preview_handle = None
return self.pipe(
prompt=prompt,
negative_prompt=negative_prompt,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
guidance_scale=guidance_scale,
clip_skip=clip_skip,
num_images_per_prompt=num_images,
num_inference_steps=num_steps,
generator=generator,
height=height,
width=width,
callback=self.callback_pipe if self.image_previews else None,
callback_steps=10 if self.image_previews else 100,
).images
# @torch.autocast('cuda')
# def run_pipe_SDXL(
# self,
# prompt: str,
# negative_prompt: str,
# prompt_embeds,
# negative_prompt_embeds,
# num_images: int,
# num_steps: int,
# guidance_scale: float,
# clip_skip: int,
# height : int,
# width : int,
# generator,
# seddd,
# conditioning,
# pooled,
# ) -> list[PIL.Image.Image]:
# # Return PIL images
# #generator = torch.Generator("cuda").manual_seed(seddd) # generator = torch.Generator("cuda").manual_seed(seed),
# return self.pipe(
# prompt = None,
# negative_prompt = None,
# prompt_embeds=conditioning[0:1],
# pooled_prompt_embeds=pooled[0:1],
# negative_prompt_embeds=conditioning[1:2],
# negative_pooled_prompt_embeds=pooled[1:2],
# height = height,
# width = width,
# num_inference_steps = num_steps,
# guidance_scale = guidance_scale,
# clip_skip = clip_skip,
# num_images_per_prompt = num_images,
# generator = generator,
# ).images
@torch.autocast("cuda")
def run_pipe_inpaint(
self,
prompt: str,
negative_prompt: str,
prompt_embeds,
negative_prompt_embeds,
control_image: PIL.Image.Image,
num_images: int,
num_steps: int,
guidance_scale: float,
clip_skip: int,
strength: float,
init_image,
control_mask,
controlnet_conditioning_scale,
control_guidance_start,
control_guidance_end,
generator,
) -> list[PIL.Image.Image]:
# Return PIL images
# generator = torch.Generator().manual_seed(seed)
return self.pipe(
prompt=None,
negative_prompt=None,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
eta=1.0,
strength=strength,
image=init_image, # original image
mask_image=control_mask, # mask, values of 0 to 255
control_image=control_image, # tensor control image
num_images_per_prompt=num_images,
num_inference_steps=num_steps,
guidance_scale=guidance_scale,
clip_skip=clip_skip,
generator=generator,
controlnet_conditioning_scale=controlnet_conditioning_scale,
control_guidance_start=control_guidance_start,
control_guidance_end=control_guidance_end,
).images
@torch.autocast("cuda")
def run_pipe_img2img(
self,
prompt: str,
negative_prompt: str,
prompt_embeds,
negative_prompt_embeds,
num_images: int,
num_steps: int,
guidance_scale: float,
clip_skip: int,
strength: float,
init_image,
generator,
) -> list[PIL.Image.Image]:
# Return PIL images
# generator = torch.Generator().manual_seed(seed)
return self.pipe(
prompt=None,
negative_prompt=None,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
eta=1.0,
strength=strength,
image=init_image, # original image
num_images_per_prompt=num_images,
num_inference_steps=num_steps,
guidance_scale=guidance_scale,
clip_skip=clip_skip,
generator=generator,
).images
### self.x_process return image_preprocessor###
@torch.inference_mode()
def process_canny(
self,
image: np.ndarray,
image_resolution: int,
preprocess_resolution: int,
low_threshold: int,
high_threshold: int,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
self.preprocessor.load("Canny")
control_image = self.preprocessor(
image=image,
low_threshold=low_threshold,
high_threshold=high_threshold,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
)
return control_image
@torch.inference_mode()
def process_mlsd(
self,
image: np.ndarray,
image_resolution: int,
preprocess_resolution: int,
value_threshold: float,
distance_threshold: float,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
self.preprocessor.load("MLSD")
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
thr_v=value_threshold,
thr_d=distance_threshold,
)
return control_image
@torch.inference_mode()
def process_scribble(
self,
image: np.ndarray,
image_resolution: int,
preprocess_resolution: int,
preprocessor_name: str,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
if preprocessor_name == "None":
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
control_image = PIL.Image.fromarray(image)
elif preprocessor_name == "HED":
self.preprocessor.load(preprocessor_name)
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
scribble=False,
)
elif preprocessor_name == "PidiNet":
self.preprocessor.load(preprocessor_name)
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
safe=False,
)
return control_image
@torch.inference_mode()
def process_scribble_interactive(
self,
image_and_mask: dict[str, np.ndarray],
image_resolution: int,
) -> list[PIL.Image.Image]:
if image_and_mask is None:
raise ValueError
image = image_and_mask["mask"]
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
control_image = PIL.Image.fromarray(image)
return control_image
@torch.inference_mode()
def process_softedge(
self,
image: np.ndarray,
image_resolution: int,
preprocess_resolution: int,
preprocessor_name: str,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
if preprocessor_name == "None":
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
control_image = PIL.Image.fromarray(image)
elif preprocessor_name in ["HED", "HED safe"]:
safe = "safe" in preprocessor_name
self.preprocessor.load("HED")
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
scribble=safe,
)
elif preprocessor_name in ["PidiNet", "PidiNet safe"]:
safe = "safe" in preprocessor_name
self.preprocessor.load("PidiNet")
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
safe=safe,
)
else:
raise ValueError
return control_image
@torch.inference_mode()
def process_openpose(
self,
image: np.ndarray,
image_resolution: int,
preprocess_resolution: int,
preprocessor_name: str,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
if preprocessor_name == "None":
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
control_image = PIL.Image.fromarray(image)
else:
self.preprocessor.load("Openpose")
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
hand_and_face=True,
)
return control_image
@torch.inference_mode()
def process_segmentation(
self,
image: np.ndarray,
image_resolution: int,
preprocess_resolution: int,
preprocessor_name: str,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
if preprocessor_name == "None":
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
control_image = PIL.Image.fromarray(image)
else:
self.preprocessor.load(preprocessor_name)
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
)
return control_image
@torch.inference_mode()
def process_depth(
self,
image: np.ndarray,
image_resolution: int,
preprocess_resolution: int,
preprocessor_name: str,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
if preprocessor_name == "None":
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
control_image = PIL.Image.fromarray(image)
else:
self.preprocessor.load(preprocessor_name)
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
)
return control_image
@torch.inference_mode()
def process_normal(
self,
image: np.ndarray,
image_resolution: int,
preprocess_resolution: int,
preprocessor_name: str,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
if preprocessor_name == "None":
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
control_image = PIL.Image.fromarray(image)
else:
self.preprocessor.load("NormalBae")
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
)
return control_image
@torch.inference_mode()
def process_lineart(
self,
image: np.ndarray,
image_resolution: int,
preprocess_resolution: int,
preprocessor_name: str,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
if preprocessor_name in ["None", "None (anime)"]:
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
control_image = PIL.Image.fromarray(image)
elif preprocessor_name in ["Lineart", "Lineart coarse"]:
coarse = "coarse" in preprocessor_name
self.preprocessor.load("Lineart")
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
coarse=coarse,
)
elif preprocessor_name == "Lineart (anime)":
self.preprocessor.load("LineartAnime")
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
detect_resolution=preprocess_resolution,
)
if self.class_name == "StableDiffusionPipeline":
if "anime" in preprocessor_name:
self.load_controlnet_weight("lineart_anime")
logger.info("Linear anime")
else:
self.load_controlnet_weight("lineart")
return control_image
@torch.inference_mode()
def process_shuffle(
self,
image: np.ndarray,
image_resolution: int,
preprocessor_name: str,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
if preprocessor_name == "None":
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
control_image = PIL.Image.fromarray(image)
else:
self.preprocessor.load(preprocessor_name)
control_image = self.preprocessor(
image=image,
image_resolution=image_resolution,
)
return control_image
@torch.inference_mode()
def process_ip2p(
self,
image: np.ndarray,
image_resolution: int,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
control_image = PIL.Image.fromarray(image)
return control_image
@torch.inference_mode()
def process_inpaint(
self,
image: np.ndarray,
image_resolution: int,
preprocess_resolution: int,
image_mask: str, ###
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
init_image = PIL.Image.fromarray(image)
image_mask = HWC3(image_mask)
image_mask = resize_image(image_mask, resolution=image_resolution)
control_mask = PIL.Image.fromarray(image_mask)
control_image = make_inpaint_condition(init_image, control_mask)
return init_image, control_mask, control_image
@torch.inference_mode()
def process_img2img(
self,
image: np.ndarray,
image_resolution: int,
) -> list[PIL.Image.Image]:
if image is None:
raise ValueError
image = HWC3(image)
image = resize_image(image, resolution=image_resolution)
init_image = PIL.Image.fromarray(image)
return init_image
def get_scheduler(self, name):
if name in SCHEDULER_CONFIG_MAP:
scheduler_class, config = SCHEDULER_CONFIG_MAP[name]
#return scheduler_class.from_config(self.pipe.scheduler.config, **config)
# beta self.default_scheduler
return scheduler_class.from_config(self.default_scheduler.config, **config)
else:
raise ValueError(f"Scheduler with name {name} not found. Valid schedulers: {', '.join(scheduler_names)}")
def create_prompt_embeds(
self,
prompt,
negative_prompt,
textual_inversion,
clip_skip,
syntax_weights,
):
if self.class_name == "StableDiffusionPipeline":
if self.embed_loaded != textual_inversion and textual_inversion != []:
# Textual Inversion
for name, directory_name in textual_inversion:
try:
if directory_name.endswith(".pt"):
model = torch.load(directory_name, map_location=self.device)
model_tensors = model.get("string_to_param").get("*")
s_model = {"emb_params": model_tensors}
# save_file(s_model, directory_name[:-3] + '.safetensors')
self.pipe.load_textual_inversion(s_model, token=name)
else:
# self.pipe.text_encoder.resize_token_embeddings(len(self.pipe.tokenizer),pad_to_multiple_of=128)
# self.pipe.load_textual_inversion("./bad_prompt.pt", token="baddd")
self.pipe.load_textual_inversion(directory_name, token=name)
if not self.gui_active:
logger.info(f"Applied : {name}")
except Exception as e:
exception = str(e)
if name in exception:
logger.debug(f"Previous loaded embed {name}")
else:
logger.error(exception)
logger.error(f"Can't apply embed {name}")
self.embed_loaded = textual_inversion
# Clip skip
# clip_skip_diffusers = None #clip_skip - 1 # future update
if not hasattr(self, "compel"):
self.compel = Compel(
tokenizer=self.pipe.tokenizer,
text_encoder=self.pipe.text_encoder,
truncate_long_prompts=False,
returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NORMALIZED if clip_skip else ReturnedEmbeddingsType.LAST_HIDDEN_STATES_NORMALIZED,
)
# Prompt weights for textual inversion
prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer)
negative_prompt_ti = self.pipe.maybe_convert_prompt(
negative_prompt, self.pipe.tokenizer
)
# separate the multi-vector textual inversion by comma
if self.embed_loaded != []:
prompt_ti = add_comma_after_pattern_ti(prompt_ti)
negative_prompt_ti = add_comma_after_pattern_ti(negative_prompt_ti)
# Syntax weights
self.pipe.to(self.device)
if syntax_weights == "Classic":
prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel)
negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel)
else:
prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel, compel_process_sd=True)
negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel, compel_process_sd=True)
# Fix error shape
if prompt_emb.shape != negative_prompt_emb.shape:
(
prompt_emb,
negative_prompt_emb,
) = self.compel.pad_conditioning_tensors_to_same_length(
[prompt_emb, negative_prompt_emb]
)
return prompt_emb, negative_prompt_emb
else:
# SDXL embed
if self.embed_loaded != textual_inversion and textual_inversion != []:
# Textual Inversion
for name, directory_name in textual_inversion:
try:
from safetensors.torch import load_file
state_dict = load_file(directory_name)
self.pipe.load_textual_inversion(state_dict["clip_g"], token=name, text_encoder=self.pipe.text_encoder_2, tokenizer=self.pipe.tokenizer_2)
self.pipe.load_textual_inversion(state_dict["clip_l"], token=name, text_encoder=self.pipe.text_encoder, tokenizer=self.pipe.tokenizer)
if not self.gui_active:
logger.info(f"Applied : {name}")
except Exception as e:
exception = str(e)
if name in exception:
logger.debug(f"Previous loaded embed {name}")
else:
logger.error(exception)
logger.error(f"Can't apply embed {name}")
self.embed_loaded = textual_inversion
if not hasattr(self, "compel"):
# Clip skip
if clip_skip:
# clip_skip_diffusers = None #clip_skip - 1 # future update
self.compel = Compel(
tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2],
text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2],
returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NON_NORMALIZED,
requires_pooled=[False, True],
truncate_long_prompts=False,
)
else:
# clip_skip_diffusers = None # clip_skip = None # future update
self.compel = Compel(
tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2],
text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2],
requires_pooled=[False, True],
truncate_long_prompts=False,
)
# Prompt weights for textual inversion
try:
prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer)
negative_prompt_ti = self.pipe.maybe_convert_prompt(negative_prompt, self.pipe.tokenizer)
except:
prompt_ti = prompt
negative_prompt_ti = negative_prompt
logger.error("FAILED: Convert prompt for textual inversion")
# prompt syntax style a1...
if syntax_weights == "Classic":
self.pipe.to("cuda")
prompt_ti = get_embed_new(prompt_ti, self.pipe, self.compel, only_convert_string=True)
negative_prompt_ti = get_embed_new(negative_prompt_ti, self.pipe, self.compel, only_convert_string=True)
else:
prompt_ti = prompt
negative_prompt_ti = negative_prompt
conditioning, pooled = self.compel([prompt_ti, negative_prompt_ti])
return conditioning, pooled
def process_lora(self, select_lora, lora_weights_scale, unload=False):
device = "cuda" if torch.cuda.is_available() else "cpu"
if not unload:
if select_lora != None:
try:
self.pipe = lora_mix_load(
self.pipe,
select_lora,
lora_weights_scale,
device=device,
dtype=self.type_model_precision,
)
logger.info(select_lora)
except Exception as e:
logger.error(f"ERROR: LoRA not compatible: {select_lora}")
logger.debug(f"{str(e)}")
return self.pipe
else:
# Unload numerically unstable but fast and need less memory
if select_lora != None:
try:
self.pipe = lora_mix_load(
self.pipe,
select_lora,
-lora_weights_scale,
device=device,
dtype=self.type_model_precision,
)
logger.debug(f"Unload LoRA: {select_lora}")
except:
pass
return self.pipe
def load_style_file(self, style_json_file):
if os.path.exists(style_json_file):
try:
file_json_read = get_json_content(style_json_file)
self.styles_data = {k["name"]: (k["prompt"], k["negative_prompt"]) for k in file_json_read}
self.STYLE_NAMES = list(self.styles_data.keys())
self.style_json_file = style_json_file
logger.info(f"Styles json file loaded with {len(self.STYLE_NAMES)} styles")
logger.debug(str(self.STYLE_NAMES))
except Exception as e:
logger.error(str(e))
else:
logger.error("Not found styles json file in directory")
def callback_pipe(self, iter, t, latents):
# convert latents to image
with torch.no_grad():
latents = 1 / 0.18215 * latents
image = self.pipe.vae.decode(latents).sample
image = (image / 2 + 0.5).clamp(0, 1)
# we always cast to float32 as this does not cause significant overhead and is compatible with bfloa16
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# convert to PIL Images
image = self.pipe.numpy_to_pil(image)
# show one image
# global preview_handle
if self.preview_handle == None:
self.preview_handle = display(image[0], display_id=True)
else:
self.preview_handle.update(image[0])
def __call__(
self,
prompt: str = "",
negative_prompt: str = "",
img_height: int = 512,
img_width: int = 512,
num_images: int = 1,
num_steps: int = 30,
guidance_scale: float = 7.5,
clip_skip: Optional[bool] = True,
seed: int = -1,
sampler: str = "DPM++ 2M",
syntax_weights: str = "Classic",
lora_A: Optional[str] = None,
lora_scale_A: float = 1.0,
lora_B: Optional[str] = None,
lora_scale_B: float = 1.0,
lora_C: Optional[str] = None,
lora_scale_C: float = 1.0,
lora_D: Optional[str] = None,
lora_scale_D: float = 1.0,
lora_E: Optional[str] = None,
lora_scale_E: float = 1.0,
textual_inversion: List[Tuple[str, str]] = [],
FreeU: bool = False,
adetailer_A: bool = False,
adetailer_A_params: Dict[str, Any] = {},
adetailer_B: bool = False,
adetailer_B_params: Dict[str, Any] = {},
style_prompt: Optional[Any] = [""],
style_json_file: Optional[Any] = "",
image: Optional[Any] = None,
preprocessor_name: Optional[str] = "None",
preprocess_resolution: int = 512,
image_resolution: int = 512,
image_mask: Optional[Any] = None,
strength: float = 0.35,
low_threshold: int = 100,
high_threshold: int = 200,
value_threshold: float = 0.1,
distance_threshold: float = 0.1,
controlnet_conditioning_scale: float = 1.0,
control_guidance_start: float = 0.0,
control_guidance_end: float = 1.0,
t2i_adapter_preprocessor: bool = True,
t2i_adapter_conditioning_scale: float = 1.0,
t2i_adapter_conditioning_factor: float = 1.0,
upscaler_model_path: Optional[str] = None, # add latent
upscaler_increases_size: float = 1.5,
esrgan_tile: int = 100,
esrgan_tile_overlap: int = 10,
hires_steps: int = 25,
hires_denoising_strength: float = 0.35,
hires_prompt: str = "",
hires_negative_prompt: str = "",
hires_sampler: str = "Use same sampler",
loop_generation: int = 1,
display_images: bool = False,
save_generated_images: bool = True,
image_storage_location: str = "./images",
generator_in_cpu: bool = False,
leave_progress_bar: bool = False,
disable_progress_bar: bool = False,
hires_before_adetailer: bool = False,
hires_after_adetailer: bool = True,
retain_compel_previous_load: bool = False,
retain_detailfix_model_previous_load: bool = False,
retain_hires_model_previous_load: bool = False,
image_previews: bool = False,
xformers_memory_efficient_attention: bool = False,
gui_active: bool = False,
):
"""
The call function for the generation.
Args:
prompt (str , optional):
The prompt or prompts to guide image generation.
negative_prompt (str , optional):
The prompt or prompts to guide what to not include in image generation. Ignored when not using guidance (`guidance_scale < 1`).
img_height (int, optional, defaults to 512):
The height in pixels of the generated image.
img_width (int, optional, defaults to 512):
The width in pixels of the generated image.
num_images (int, optional, defaults to 1):
The number of images to generate per prompt.
num_steps (int, optional, defaults to 30):
The number of denoising steps. More denoising steps usually lead to a higher quality image at the
expense of slower inference.
guidance_scale (float, optional, defaults to 7.5):
A higher guidance scale value encourages the model to generate images closely linked to the text
`prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`.
clip_skip (bool, optional):
Number of layers to be skipped from CLIP while computing the prompt embeddings. It can be placed on
the penultimate (True) or last layer (False).
seed (int, optional, defaults to -1):
A seed for controlling the randomness of the image generation process. -1 design a random seed.
sampler (str, optional, defaults to "DPM++ 2M"):
The sampler used for the generation process. Available samplers: DPM++ 2M, DPM++ 2M Karras, DPM++ 2M SDE,
DPM++ 2M SDE Karras, DPM++ SDE, DPM++ SDE Karras, DPM2, DPM2 Karras, Euler, Euler a, Heun, LMS, LMS Karras,
DDIM, DEIS, UniPC, DPM2 a, DPM2 a Karras, PNDM, LCM, DPM++ 2M Lu, DPM++ 2M Ef, DPM++ 2M SDE Lu and DPM++ 2M SDE Ef.
syntax_weights (str, optional, defaults to "Classic"):
Specifies the type of syntax weights used during generation. "Classic" is (word:weight), "Compel" is (word)weight
lora_A (str, optional):
Placeholder for lora A parameter.
lora_scale_A (float, optional, defaults to 1.0):
Placeholder for lora scale A parameter.
lora_B (str, optional):
Placeholder for lora B parameter.
lora_scale_B (float, optional, defaults to 1.0):
Placeholder for lora scale B parameter.
lora_C (str, optional):
Placeholder for lora C parameter.
lora_scale_C (float, optional, defaults to 1.0):
Placeholder for lora scale C parameter.
lora_D (str, optional):
Placeholder for lora D parameter.
lora_scale_D (float, optional, defaults to 1.0):
Placeholder for lora scale D parameter.
lora_E (str, optional):
Placeholder for lora E parameter.
lora_scale_E (float, optional, defaults to 1.0):
Placeholder for lora scale E parameter.
textual_inversion (List[Tuple[str, str]], optional, defaults to []):
Placeholder for textual inversion list of tuples. Help the model to adapt to a particular
style. [("<token_activation>","<path_embeding>"),...]
FreeU (bool, optional, defaults to False):
Is a method that substantially improves diffusion model sample quality at no costs.
adetailer_A (bool, optional, defaults to False):
Guided Inpainting to Correct Image, it is preferable to use low values for strength.
adetailer_A_params (Dict[str, Any], optional, defaults to {}):
Placeholder for adetailer_A parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}.
If not specified, default values will be used:
- face_detector_ad (bool): Indicates whether face detection is enabled. Defaults to True.
- person_detector_ad (bool): Indicates whether person detection is enabled. Defaults to True.
- hand_detector_ad (bool): Indicates whether hand detection is enabled. Defaults to False.
- prompt (str): A prompt for the adetailer_A. Defaults to an empty string.
- negative_prompt (str): A negative prompt for the adetailer_A. Defaults to an empty string.
- strength (float): The strength parameter value. Defaults to 0.35.
- mask_dilation (int): The mask dilation value. Defaults to 4.
- mask_blur (int): The mask blur value. Defaults to 4.
- mask_padding (int): The mask padding value. Defaults to 32.
- inpaint_only (bool): Indicates if only inpainting is to be performed. Defaults to True. False is img2img mode
- sampler (str): The sampler type to be used. Defaults to "Use same sampler".
adetailer_B (bool, optional, defaults to False):
Guided Inpainting to Correct Image, it is preferable to use low values for strength.
adetailer_B_params (Dict[str, Any], optional, defaults to {}):
Placeholder for adetailer_B parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}.
If not specified, default values will be used.
style_prompt (str, optional):
If a style that is in STYLE_NAMES is specified, it will be added to the original prompt and negative prompt.
style_json_file (str, optional):
JSON with styles to be applied and used in style_prompt.
upscaler_model_path (str, optional):
Placeholder for upscaler model path.
upscaler_increases_size (float, optional, defaults to 1.5):
Placeholder for upscaler increases size parameter.
esrgan_tile (int, optional, defaults to 100):
Tile if use a ESRGAN model.
esrgan_tile_overlap (int, optional, defaults to 100):
Tile overlap if use a ESRGAN model.
hires_steps (int, optional, defaults to 25):
The number of denoising steps for hires. More denoising steps usually lead to a higher quality image at the
expense of slower inference.
hires_denoising_strength (float, optional, defaults to 0.35):
Strength parameter for the hires.
hires_prompt (str , optional):
The prompt for hires. If not specified, the main prompt will be used.
hires_negative_prompt (str , optional):
The negative prompt for hires. If not specified, the main negative prompt will be used.
hires_sampler (str, optional, defaults to "Use same sampler"):
The sampler used for the hires generation process. If not specified, the main sampler will be used.
image (Any, optional):
The image to be used for the Inpaint, ControlNet, or T2I adapter.
preprocessor_name (str, optional, defaults to "None"):
Preprocessor name for ControlNet.
preprocess_resolution (int, optional, defaults to 512):
Preprocess resolution for the Inpaint, ControlNet, or T2I adapter.
image_resolution (int, optional, defaults to 512):
Image resolution for the Img2Img, Inpaint, ControlNet, or T2I adapter.
image_mask (Any, optional):
Path image mask for the Inpaint.
strength (float, optional, defaults to 0.35):
Strength parameter for the Inpaint and Img2Img.
low_threshold (int, optional, defaults to 100):
Low threshold parameter for ControlNet and T2I Adapter Canny.
high_threshold (int, optional, defaults to 200):
High threshold parameter for ControlNet and T2I Adapter Canny.
value_threshold (float, optional, defaults to 0.1):
Value threshold parameter for ControlNet MLSD.
distance_threshold (float, optional, defaults to 0.1):
Distance threshold parameter for ControlNet MLSD.
controlnet_conditioning_scale (float, optional, defaults to 1.0):
The outputs of the ControlNet are multiplied by `controlnet_conditioning_scale` before they are added
to the residual in the original `unet`. Used in ControlNet and Inpaint
control_guidance_start (float, optional, defaults to 0.0):
The percentage of total steps at which the ControlNet starts applying. Used in ControlNet and Inpaint
control_guidance_end (float, optional, defaults to 1.0):
The percentage of total steps at which the ControlNet stops applying. Used in ControlNet and Inpaint
t2i_adapter_preprocessor (bool, optional, defaults to True):
Preprocessor for the image in sdxl_canny by default is True.
t2i_adapter_conditioning_scale (float, optional, defaults to 1.0):
The outputs of the adapter are multiplied by `t2i_adapter_conditioning_scale` before they are added to the
residual in the original unet.
t2i_adapter_conditioning_factor (float, optional, defaults to 1.0):
The fraction of timesteps for which adapter should be applied. If `t2i_adapter_conditioning_factor` is
`0.0`, adapter is not applied at all. If `t2i_adapter_conditioning_factor` is `1.0`, adapter is applied for
all timesteps. If `t2i_adapter_conditioning_factor` is `0.5`, adapter is applied for half of the timesteps.
loop_generation (int, optional, defaults to 1):
The number of times the specified `num_images` will be generated.
display_images (bool, optional, defaults to False):
If you use a notebook, you will be able to display the images generated with this parameter.
save_generated_images (bool, optional, defaults to True):
By default, the generated images are saved in the current location within the 'images' folder. You can disable this with this parameter.
image_storage_location (str , optional, defaults to "./images"):
The directory where the generated images are saved.
generator_in_cpu (bool, optional, defaults to False):
The generator by default is specified on the GPU. To obtain more consistent results across various environments,
it is preferable to use the generator on the CPU.
leave_progress_bar (bool, optional, defaults to False):
Leave the progress bar after generating the image.
disable_progress_bar (bool, optional, defaults to False):
Do not display the progress bar during image generation.
hires_before_adetailer (bool, optional, defaults to False):
Apply an upscale and high-resolution fix before adetailer.
hires_after_adetailer (bool, optional, defaults to True):
Apply an upscale and high-resolution fix after adetailer.
retain_compel_previous_load (bool, optional, defaults to False):
The previous compel remains preloaded in memory.
retain_detailfix_model_previous_load (bool, optional, defaults to False):
The previous adetailer model remains preloaded in memory.
retain_hires_model_previous_load (bool, optional, defaults to False):
The previous hires model remains preloaded in memory.
image_previews (bool, optional, defaults to False):
Displaying the image denoising process.
xformers_memory_efficient_attention (bool, optional, defaults to False):
Improves generation time, currently disabled.
gui_active (bool, optional, defaults to False):
utility when used with a GUI, it changes the behavior especially by displaying confirmation messages or options.
Specific parameter usage details:
Additional parameters that will be used in Inpaint:
- image
- image_mask
- image_resolution
- strength
for SD 1.5:
- controlnet_conditioning_scale
- control_guidance_start
- control_guidance_end
Additional parameters that will be used in img2img:
- image
- image_resolution
- strength
Additional parameters that will be used in ControlNet for SD 1.5 depending on the task:
- image
- preprocessor_name
- preprocess_resolution
- image_resolution
- controlnet_conditioning_scale
- control_guidance_start
- control_guidance_end
for Canny:
- low_threshold
- high_threshold
for MLSD:
- value_threshold
- distance_threshold
Additional parameters that will be used in T2I adapter for SDXL depending on the task:
- image
- preprocess_resolution
- image_resolution
- t2i_adapter_preprocessor
- t2i_adapter_conditioning_scale
- t2i_adapter_conditioning_factor
"""
if self.task_name != "txt2img" and image == None:
raise ValueError(
"You need to specify the <image> for this task."
)
if img_height % 8 != 0:
img_height = img_height + (8 - img_height % 8)
logger.warning(f"Height must be divisible by 8, changed to {str(img_height)}")
if img_width % 8 != 0:
img_width = img_width + (8 - img_width % 8)
logger.warning(f"Width must be divisible by 8, changed to {str(img_width)}")
if image_resolution % 8 != 0:
image_resolution = image_resolution + (8 - image_resolution % 8)
logger.warning(f"Image resolution must be divisible by 8, changed to {str(image_resolution)}")
if control_guidance_start >= control_guidance_end:
logger.error(
"Control guidance start (ControlNet Start Threshold) cannot be larger or equal to control guidance end (ControlNet Stop Threshold). The default values 0.0 and 1.0 will be used."
)
control_guidance_start, control_guidance_end = 0.0, 1.0
self.gui_active = gui_active
self.image_previews = image_previews
if self.pipe == None:
self.load_pipe(
self.base_model_id,
task_name=self.task_name,
vae_model=self.vae_model,
reload=True,
)
self.pipe.set_progress_bar_config(leave=leave_progress_bar)
self.pipe.set_progress_bar_config(disable=disable_progress_bar)
xformers_memory_efficient_attention=False # disabled
if xformers_memory_efficient_attention and torch.cuda.is_available():
self.pipe.disable_xformers_memory_efficient_attention()
self.pipe.to(self.device)
# Load style prompt file
if style_json_file != "" and style_json_file != self.style_json_file:
self.load_style_file(style_json_file)
# Set style
if isinstance(style_prompt, str):
style_prompt = [style_prompt]
if style_prompt != [""]:
prompt, negative_prompt = apply_style(style_prompt, prompt, negative_prompt, self.styles_data, self.STYLE_NAMES)
# LoRA load
if self.lora_memory == [
lora_A,
lora_B,
lora_C,
lora_D,
lora_E,
] and self.lora_scale_memory == [
lora_scale_A,
lora_scale_B,
lora_scale_C,
lora_scale_D,
lora_scale_E,
]:
for single_lora in self.lora_memory:
if single_lora != None:
logger.info(f"LoRA in memory: {single_lora}")
pass
else:
logger.debug("_un, re and load_ lora")
self.pipe = self.process_lora(
self.lora_memory[0], self.lora_scale_memory[0], unload=True
)
self.pipe = self.process_lora(
self.lora_memory[1], self.lora_scale_memory[1], unload=True
)
self.pipe = self.process_lora(
self.lora_memory[2], self.lora_scale_memory[2], unload=True
)
self.pipe = self.process_lora(
self.lora_memory[3], self.lora_scale_memory[3], unload=True
)
self.pipe = self.process_lora(
self.lora_memory[4], self.lora_scale_memory[4], unload=True
)
self.pipe = self.process_lora(lora_A, lora_scale_A)
self.pipe = self.process_lora(lora_B, lora_scale_B)
self.pipe = self.process_lora(lora_C, lora_scale_C)
self.pipe = self.process_lora(lora_D, lora_scale_D)
self.pipe = self.process_lora(lora_E, lora_scale_E)
self.lora_memory = [lora_A, lora_B, lora_C, lora_D, lora_E]
self.lora_scale_memory = [
lora_scale_A,
lora_scale_B,
lora_scale_C,
lora_scale_D,
lora_scale_E,
]
# LCM config
if sampler == "LCM" and self.LCMconfig == None:
if self.class_name == "StableDiffusionPipeline":
adapter_id = "latent-consistency/lcm-lora-sdv1-5"
elif self.class_name == "StableDiffusionXLPipeline":
adapter_id = "latent-consistency/lcm-lora-sdxl"
self.process_lora(adapter_id, 1.0)
self.LCMconfig = adapter_id
logger.info("LCM")
elif sampler != "LCM" and self.LCMconfig != None:
self.process_lora(self.LCMconfig, 1.0, unload=True)
self.LCMconfig = None
elif self.LCMconfig != None:
logger.info("LCM")
# FreeU
if FreeU:
logger.info("FreeU active")
if self.class_name == "StableDiffusionPipeline":
# sd
self.pipe.enable_freeu(s1=0.9, s2=0.2, b1=1.2, b2=1.4)
else:
# sdxl
self.pipe.enable_freeu(s1=0.6, s2=0.4, b1=1.1, b2=1.2)
self.FreeU = True
elif self.FreeU:
self.pipe.disable_freeu()
self.FreeU = False
# Prompt Optimizations
if hasattr(self, "compel") and not retain_compel_previous_load:
del self.compel
prompt_emb, negative_prompt_emb = self.create_prompt_embeds(
prompt=prompt,
negative_prompt=negative_prompt,
textual_inversion=textual_inversion,
clip_skip=clip_skip,
syntax_weights=syntax_weights,
)
if self.class_name != "StableDiffusionPipeline":
# Additional prompt for SDXL
conditioning, pooled = prompt_emb.clone(), negative_prompt_emb.clone()
prompt_emb = negative_prompt_emb = None
if torch.cuda.is_available() and xformers_memory_efficient_attention:
if xformers_memory_efficient_attention:
self.pipe.enable_xformers_memory_efficient_attention()
else:
self.pipe.disable_xformers_memory_efficient_attention()
try:
#self.pipe.scheduler = DPMSolverSinglestepScheduler() # fix default params by random scheduler, not recomn
self.pipe.scheduler = self.get_scheduler(sampler)
except Exception as e:
logger.debug(f"{e}")
logger.warning(f"Error in sampler, please try again")
#self.pipe = None
torch.cuda.empty_cache()
gc.collect()
return
self.pipe.safety_checker = None
# Get image Global
if self.task_name != "txt2img":
if isinstance(image, str):
# If the input is a string (file path), open it as an image
image_pil = Image.open(image)
numpy_array = np.array(image_pil, dtype=np.uint8)
elif isinstance(image, Image.Image):
# If the input is already a PIL Image, convert it to a NumPy array
numpy_array = np.array(image, dtype=np.uint8)
elif isinstance(image, np.ndarray):
# If the input is a NumPy array, np.uint8
numpy_array = image.astype(np.uint8)
else:
if gui_active:
logger.info(
"Not found image"
)
return
else:
raise ValueError(
"Unsupported image type or not control image found; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive"
)
# Extract the RGB channels
try:
array_rgb = numpy_array[:, :, :3]
except:
logger.error("Unsupported image type")
raise ValueError(
"Unsupported image type; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive"
) # return
# Get params preprocess Global SD 1.5
preprocess_params_config = {}
if self.task_name not in ["txt2img", "inpaint", "img2img"]:
preprocess_params_config["image"] = array_rgb
preprocess_params_config["image_resolution"] = image_resolution
if self.task_name != "ip2p":
if self.task_name != "shuffle":
preprocess_params_config[
"preprocess_resolution"
] = preprocess_resolution
if self.task_name != "mlsd" and self.task_name != "canny":
preprocess_params_config["preprocessor_name"] = preprocessor_name
# RUN Preprocess SD 1.5
if self.task_name == "inpaint":
# Get mask for Inpaint
if gui_active or os.path.exists(str(image_mask)):
# Read image mask from gui
mask_control_img = Image.open(image_mask)
numpy_array_mask = np.array(mask_control_img, dtype=np.uint8)
array_rgb_mask = numpy_array_mask[:, :, :3]
elif not gui_active:
# Convert control image to draw
import base64
import matplotlib.pyplot as plt
name_without_extension = os.path.splitext(image.split("/")[-1])[0]
image64 = base64.b64encode(open(image, "rb").read())
image64 = image64.decode("utf-8")
img = np.array(plt.imread(f"{image}")[:, :, :3])
# Create mask interactive
logger.info(f"Draw the mask on this canvas using the mouse. When you finish, press 'Finish' in the bottom side of the canvas.")
draw(
image64,
filename=f"./{name_without_extension}_draw.png",
w=img.shape[1],
h=img.shape[0],
line_width=0.04 * img.shape[1],
)
# Create mask and save
with_mask = np.array(
plt.imread(f"./{name_without_extension}_draw.png")[:, :, :3]
)
mask = (
(with_mask[:, :, 0] == 1)
* (with_mask[:, :, 1] == 0)
* (with_mask[:, :, 2] == 0)
)
plt.imsave(f"./{name_without_extension}_mask.png", mask, cmap="gray")
mask_control = f"./{name_without_extension}_mask.png"
logger.info(f"Mask saved: {mask_control}")
# Read image mask
mask_control_img = Image.open(mask_control)
numpy_array_mask = np.array(mask_control_img, dtype=np.uint8)
array_rgb_mask = numpy_array_mask[:, :, :3]
else:
raise ValueError("No images found")
init_image, control_mask, control_image = self.process_inpaint(
image=array_rgb,
image_resolution=image_resolution,
preprocess_resolution=preprocess_resolution, # Not used
image_mask=array_rgb_mask,
)
elif self.task_name == "openpose":
logger.info("Openpose")
control_image = self.process_openpose(**preprocess_params_config)
elif self.task_name == "canny":
logger.info("Canny")
control_image = self.process_canny(
**preprocess_params_config,
low_threshold=low_threshold,
high_threshold=high_threshold,
)
elif self.task_name == "mlsd":
logger.info("MLSD")
control_image = self.process_mlsd(
**preprocess_params_config,
value_threshold=value_threshold,
distance_threshold=distance_threshold,
)
elif self.task_name == "scribble":
logger.info("Scribble")
control_image = self.process_scribble(**preprocess_params_config)
elif self.task_name == "softedge":
logger.info("Softedge")
control_image = self.process_softedge(**preprocess_params_config)
elif self.task_name == "segmentation":
logger.info("Segmentation")
control_image = self.process_segmentation(**preprocess_params_config)
elif self.task_name == "depth":
logger.info("Depth")
control_image = self.process_depth(**preprocess_params_config)
elif self.task_name == "normalbae":
logger.info("NormalBae")
control_image = self.process_normal(**preprocess_params_config)
elif self.task_name == "lineart":
logger.info("Lineart")
control_image = self.process_lineart(**preprocess_params_config)
elif self.task_name == "shuffle":
logger.info("Shuffle")
control_image = self.process_shuffle(**preprocess_params_config)
elif self.task_name == "ip2p":
logger.info("Ip2p")
control_image = self.process_ip2p(**preprocess_params_config)
elif self.task_name == "img2img":
preprocess_params_config["image"] = array_rgb
preprocess_params_config["image_resolution"] = image_resolution
init_image = self.process_img2img(**preprocess_params_config)
# RUN Preprocess T2I for SDXL
if self.class_name == "StableDiffusionXLPipeline":
# Get params preprocess XL
preprocess_params_config_xl = {}
if self.task_name not in ["txt2img", "inpaint", "img2img"]:
preprocess_params_config_xl["image"] = array_rgb
preprocess_params_config_xl["preprocess_resolution"] = preprocess_resolution
preprocess_params_config_xl["image_resolution"] = image_resolution
# preprocess_params_config_xl["additional_prompt"] = additional_prompt # ""
if self.task_name == "sdxl_canny": # preprocessor true default
logger.info("SDXL Canny: Preprocessor active by default")
control_image = self.process_canny(
**preprocess_params_config_xl,
low_threshold=low_threshold,
high_threshold=high_threshold,
)
elif self.task_name == "sdxl_openpose":
logger.info("SDXL Openpose")
control_image = self.process_openpose(
preprocessor_name = "Openpose" if t2i_adapter_preprocessor else "None",
**preprocess_params_config_xl,
)
elif self.task_name == "sdxl_sketch":
logger.info("SDXL Scribble")
control_image = self.process_scribble(
preprocessor_name = "PidiNet" if t2i_adapter_preprocessor else "None",
**preprocess_params_config_xl,
)
elif self.task_name == "sdxl_depth-midas":
logger.info("SDXL Depth")
control_image = self.process_depth(
preprocessor_name = "Midas" if t2i_adapter_preprocessor else "None",
**preprocess_params_config_xl,
)
elif self.task_name == "sdxl_lineart":
logger.info("SDXL Lineart")
control_image = self.process_lineart(
preprocessor_name = "Lineart" if t2i_adapter_preprocessor else "None",
**preprocess_params_config_xl,
)
# Get general params for TASK
if self.class_name == "StableDiffusionPipeline":
# Base params pipe sd
pipe_params_config = {
"prompt": None, # prompt,
"negative_prompt": None, # negative_prompt,
"prompt_embeds": prompt_emb,
"negative_prompt_embeds": negative_prompt_emb,
"num_images": num_images,
"num_steps": num_steps,
"guidance_scale": guidance_scale,
"clip_skip": None, # clip_skip, because we use clip skip of compel
}
else:
# Base params pipe sdxl
pipe_params_config = {
"prompt" : None,
"negative_prompt" : None,
"num_inference_steps" : num_steps,
"guidance_scale" : guidance_scale,
"clip_skip" : None,
"num_images_per_prompt" : num_images,
}
# New params
if self.class_name == "StableDiffusionXLPipeline":
# pipe sdxl
if self.task_name == "txt2img":
pipe_params_config["height"] = img_height
pipe_params_config["width"] = img_width
elif self.task_name == "inpaint":
pipe_params_config["strength"] = strength
pipe_params_config["image"] = init_image
pipe_params_config["mask_image"] = control_mask
logger.info(f"Image resolution: {str(init_image.size)}")
elif self.task_name not in ["txt2img", "inpaint", "img2img"]:
pipe_params_config["image"] = control_image
pipe_params_config["adapter_conditioning_scale"] = t2i_adapter_conditioning_scale
pipe_params_config["adapter_conditioning_factor"] = t2i_adapter_conditioning_factor
logger.info(f"Image resolution: {str(control_image.size)}")
elif self.task_name == "img2img":
pipe_params_config["strength"] = strength
pipe_params_config["image"] = init_image
logger.info(f"Image resolution: {str(init_image.size)}")
elif self.task_name == "txt2img":
pipe_params_config["height"] = img_height
pipe_params_config["width"] = img_width
elif self.task_name == "inpaint":
pipe_params_config["strength"] = strength
pipe_params_config["init_image"] = init_image
pipe_params_config["control_mask"] = control_mask
pipe_params_config["control_image"] = control_image
pipe_params_config[
"controlnet_conditioning_scale"
] = controlnet_conditioning_scale
pipe_params_config["control_guidance_start"] = control_guidance_start
pipe_params_config["control_guidance_end"] = control_guidance_end
logger.info(f"Image resolution: {str(init_image.size)}")
elif self.task_name not in ["txt2img", "inpaint", "img2img"]:
pipe_params_config["control_image"] = control_image
pipe_params_config[
"controlnet_conditioning_scale"
] = controlnet_conditioning_scale
pipe_params_config["control_guidance_start"] = control_guidance_start
pipe_params_config["control_guidance_end"] = control_guidance_end
logger.info(f"Image resolution: {str(control_image.size)}")
elif self.task_name == "img2img":
pipe_params_config["strength"] = strength
pipe_params_config["init_image"] = init_image
logger.info(f"Image resolution: {str(init_image.size)}")
# detailfix params and pipe global
if adetailer_A or adetailer_B:
# global params detailfix
default_params_detailfix = {
"face_detector_ad" : True,
"person_detector_ad" : True,
"hand_detector_ad" : False,
"prompt": "",
"negative_prompt" : "",
"strength" : 0.35,
"mask_dilation" : 4,
"mask_blur" : 4,
"mask_padding" : 32,
#"sampler" : "Use same sampler",
#"inpaint_only" : True,
}
# Pipe detailfix_pipe
if not hasattr(self, "detailfix_pipe") or not retain_detailfix_model_previous_load:
if adetailer_A_params.get("inpaint_only", False) == True or adetailer_B_params.get("inpaint_only", False) == True:
detailfix_pipe = custom_task_model_loader(
pipe=self.pipe,
model_category="detailfix",
task_name=self.task_name,
torch_dtype=self.type_model_precision
)
else:
detailfix_pipe = custom_task_model_loader(
pipe=self.pipe,
model_category="detailfix_img2img",
task_name=self.task_name,
torch_dtype=self.type_model_precision
)
if hasattr(self, "detailfix_pipe"):
del self.detailfix_pipe
if retain_detailfix_model_previous_load:
if hasattr(self, "detailfix_pipe"):
detailfix_pipe = self.detailfix_pipe
else:
self.detailfix_pipe = detailfix_pipe
adetailer_A_params.pop("inpaint_only", None)
adetailer_B_params.pop("inpaint_only", None)
# Define base scheduler detailfix
detailfix_pipe.default_scheduler = copy.deepcopy(self.default_scheduler)
if adetailer_A_params.get("sampler", "Use same sampler") != "Use same sampler":
logger.debug("detailfix_pipe will use the sampler from adetailer_A")
detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"])
adetailer_A_params.pop("sampler", None)
if adetailer_B_params.get("sampler", "Use same sampler") != "Use same sampler":
logger.debug("detailfix_pipe will use the sampler from adetailer_B")
detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"])
adetailer_B_params.pop("sampler", None)
detailfix_pipe.set_progress_bar_config(leave=leave_progress_bar)
detailfix_pipe.set_progress_bar_config(disable=disable_progress_bar)
detailfix_pipe.to(self.device)
torch.cuda.empty_cache()
gc.collect()
if adetailer_A:
for key_param, default_value in default_params_detailfix.items():
if key_param not in adetailer_A_params:
adetailer_A_params[key_param] = default_value
elif type(default_value) != type(adetailer_A_params[key_param]):
logger.warning(f"DetailFix A: Error type param, set default {str(key_param)}")
adetailer_A_params[key_param] = default_value
detailfix_params_A = {
"prompt": adetailer_A_params["prompt"],
"negative_prompt" : adetailer_A_params["negative_prompt"],
"strength" : adetailer_A_params["strength"],
"num_inference_steps" : num_steps,
"guidance_scale" : guidance_scale,
}
# clear params yolo
adetailer_A_params.pop('strength', None)
adetailer_A_params.pop('prompt', None)
adetailer_A_params.pop('negative_prompt', None)
# Verify prompt detailfix_params_A and get valid
prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A, prompt_df_A, negative_prompt_df_A = process_prompts_valid(
detailfix_params_A["prompt"], detailfix_params_A["negative_prompt"], prompt, negative_prompt
)
# Params detailfix
if self.class_name == "StableDiffusionPipeline":
# SD detailfix
# detailfix_params_A["controlnet_conditioning_scale"] = controlnet_conditioning_scale
# detailfix_params_A["control_guidance_start"] = control_guidance_start
# detailfix_params_A["control_guidance_end"] = control_guidance_end
if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A:
detailfix_params_A["prompt_embeds"] = prompt_emb
detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb
else:
prompt_emb_ad, negative_prompt_emb_ad = self.create_prompt_embeds(
prompt=prompt_df_A,
negative_prompt=negative_prompt_df_A,
textual_inversion=textual_inversion,
clip_skip=clip_skip,
syntax_weights=syntax_weights,
)
detailfix_params_A["prompt_embeds"] = prompt_emb_ad
detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb_ad
detailfix_params_A["prompt"] = None
detailfix_params_A["negative_prompt"] = None
else:
# SDXL detailfix
if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A:
conditioning_detailfix_A, pooled_detailfix_A = conditioning, pooled
else:
conditioning_detailfix_A, pooled_detailfix_A = self.create_prompt_embeds(
prompt=prompt_df_A,
negative_prompt=negative_prompt_df_A,
textual_inversion=textual_inversion,
clip_skip=clip_skip,
syntax_weights=syntax_weights,
)
detailfix_params_A.pop('prompt', None)
detailfix_params_A.pop('negative_prompt', None)
detailfix_params_A["prompt_embeds"] = conditioning_detailfix_A[0:1]
detailfix_params_A["pooled_prompt_embeds"] = pooled_detailfix_A[0:1]
detailfix_params_A["negative_prompt_embeds"] = conditioning_detailfix_A[1:2]
detailfix_params_A["negative_pooled_prompt_embeds"] = pooled_detailfix_A[1:2]
logger.debug(f"detailfix A prompt empty {prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A}")
if not prompt_empty_detailfix_A or not negative_prompt_empty_detailfix_A:
logger.debug(f"Prompts detailfix A {prompt_df_A, negative_prompt_df_A}")
logger.debug(f"Pipe params detailfix A \n{detailfix_params_A}")
logger.debug(f"Params detailfix A \n{adetailer_A_params}")
if adetailer_B:
for key_param, default_value in default_params_detailfix.items():
if key_param not in adetailer_B_params:
adetailer_B_params[key_param] = default_value
elif type(default_value) != type(adetailer_B_params[key_param]):
logger.warning(f"DetailfFix B: Error type param, set default {str(key_param)}")
adetailer_B_params[key_param] = default_value
detailfix_params_B = {
"prompt": adetailer_B_params["prompt"],
"negative_prompt" : adetailer_B_params["negative_prompt"],
"strength" : adetailer_B_params["strength"],
"num_inference_steps" : num_steps,
"guidance_scale" : guidance_scale,
}
# clear params yolo
adetailer_B_params.pop('strength', None)
adetailer_B_params.pop('prompt', None)
adetailer_B_params.pop('negative_prompt', None)
# Verify prompt detailfix_params_B and get valid
prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B, prompt_df_B, negative_prompt_df_B = process_prompts_valid(
detailfix_params_B["prompt"], detailfix_params_B["negative_prompt"], prompt, negative_prompt
)
# Params detailfix
if self.class_name == "StableDiffusionPipeline":
# SD detailfix
# detailfix_params_B["controlnet_conditioning_scale"] = controlnet_conditioning_scale
# detailfix_params_B["control_guidance_start"] = control_guidance_start
# detailfix_params_B["control_guidance_end"] = control_guidance_end
if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B:
detailfix_params_B["prompt_embeds"] = prompt_emb
detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb
else:
prompt_emb_ad_b, negative_prompt_emb_ad_b = self.create_prompt_embeds(
prompt=prompt_df_B,
negative_prompt=negative_prompt_df_B,
textual_inversion=textual_inversion,
clip_skip=clip_skip,
syntax_weights=syntax_weights,
)
detailfix_params_B["prompt_embeds"] = prompt_emb_ad_b
detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb_ad_b
detailfix_params_B["prompt"] = None
detailfix_params_B["negative_prompt"] = None
else:
# SDXL detailfix
if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B:
conditioning_detailfix_B, pooled_detailfix_B = conditioning, pooled
else:
conditioning_detailfix_B, pooled_detailfix_B = self.create_prompt_embeds(
prompt=prompt_df_B,
negative_prompt=negative_prompt_df_B,
textual_inversion=textual_inversion,
clip_skip=clip_skip,
syntax_weights=syntax_weights,
)
detailfix_params_B.pop('prompt', None)
detailfix_params_B.pop('negative_prompt', None)
detailfix_params_B["prompt_embeds"] = conditioning_detailfix_B[0:1]
detailfix_params_B["pooled_prompt_embeds"] = pooled_detailfix_B[0:1]
detailfix_params_B["negative_prompt_embeds"] = conditioning_detailfix_B[1:2]
detailfix_params_B["negative_pooled_prompt_embeds"] = pooled_detailfix_B[1:2]
logger.debug(f"detailfix B prompt empty {prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B}")
if not prompt_empty_detailfix_B or not negative_prompt_empty_detailfix_B:
logger.debug(f"Prompts detailfix B {prompt_df_B, negative_prompt_df_B}")
logger.debug(f"Pipe params detailfix B \n{detailfix_params_B}")
logger.debug(f"Params detailfix B \n{adetailer_B_params}")
if hires_steps > 1 and upscaler_model_path != None:
# Hires params BASE
hires_params_config = {
"prompt" : None,
"negative_prompt" : None,
"num_inference_steps" : hires_steps,
"guidance_scale" : guidance_scale,
"clip_skip" : None,
"strength" : hires_denoising_strength,
}
if self.class_name == "StableDiffusionPipeline":
hires_params_config["eta"] = 1.0
# Verify prompt hires and get valid
hires_prompt_empty, hires_negative_prompt_empty, prompt_hires_valid, negative_prompt_hires_valid = process_prompts_valid(
hires_prompt, hires_negative_prompt, prompt, negative_prompt
)
# Hires embed params
if self.class_name == "StableDiffusionPipeline":
if hires_prompt_empty and hires_negative_prompt_empty:
hires_params_config["prompt_embeds"] = prompt_emb
hires_params_config["negative_prompt_embeds"] = negative_prompt_emb
else:
prompt_emb_hires, negative_prompt_emb_hires = self.create_prompt_embeds(
prompt=prompt_hires_valid,
negative_prompt=negative_prompt_hires_valid,
textual_inversion=textual_inversion,
clip_skip=clip_skip,
syntax_weights=syntax_weights,
)
hires_params_config["prompt_embeds"] = prompt_emb_hires
hires_params_config["negative_prompt_embeds"] = negative_prompt_emb_hires
else:
if hires_prompt_empty and hires_negative_prompt_empty:
hires_conditioning, hires_pooled = conditioning, pooled
else:
hires_conditioning, hires_pooled = self.create_prompt_embeds(
prompt=prompt_hires_valid,
negative_prompt=negative_prompt_hires_valid,
textual_inversion=textual_inversion,
clip_skip=clip_skip,
syntax_weights=syntax_weights,
)
hires_params_config.pop('prompt', None)
hires_params_config.pop('negative_prompt', None)
hires_params_config["prompt_embeds"] = hires_conditioning[0:1]
hires_params_config["pooled_prompt_embeds"] = hires_pooled[0:1]
hires_params_config["negative_prompt_embeds"] = hires_conditioning[1:2]
hires_params_config["negative_pooled_prompt_embeds"] = hires_pooled[1:2]
# Hires pipe
if not hasattr(self, "hires_pipe") or not retain_hires_model_previous_load:
hires_pipe = custom_task_model_loader(
pipe=self.pipe,
model_category="hires",
task_name=self.task_name,
torch_dtype=self.type_model_precision
)
if hasattr(self, "hires_pipe"):
del self.hires_pipe
if retain_hires_model_previous_load:
if hasattr(self, "hires_pipe"):
hires_pipe = self.hires_pipe
else:
self.hires_pipe = hires_pipe
# Hires scheduler
if hires_sampler != "Use same sampler":
logger.debug("New hires sampler")
hires_pipe.scheduler = self.get_scheduler(hires_sampler)
hires_pipe.set_progress_bar_config(leave=leave_progress_bar)
hires_pipe.set_progress_bar_config(disable=disable_progress_bar)
hires_pipe.to(self.device)
torch.cuda.empty_cache()
gc.collect()
else:
hires_params_config = {}
hires_pipe = None
# Debug info
try:
logger.debug(f"INFO PIPE: {self.pipe.__class__.__name__}")
logger.debug(f"text_encoder_type: {self.pipe.text_encoder.dtype}")
logger.debug(f"unet_type: {self.pipe.unet.dtype}")
logger.debug(f"vae_type: {self.pipe.vae.dtype}")
logger.debug(f"pipe_type: {self.pipe.dtype}")
logger.debug(f"scheduler_main_pipe: {self.pipe.scheduler}")
if adetailer_A or adetailer_B:
logger.debug(f"scheduler_detailfix: {detailfix_pipe.scheduler}")
if hires_steps > 1 and upscaler_model_path != None:
logger.debug(f"scheduler_hires: {hires_pipe.scheduler}")
except Exception as e:
logger.debug(f"{str(e)}")
# === RUN PIPE === #
for i in range(loop_generation):
# number seed
if seed == -1:
seeds = [random.randint(0, 2147483647) for _ in range(num_images)]
else:
if num_images == 1:
seeds = [seed]
else:
seeds = [seed] + [random.randint(0, 2147483647) for _ in range(num_images-1)]
# generators
generators = [] # List to store all the generators
for calculate_seed in seeds:
if generator_in_cpu or self.device.type == "cpu":
generator = torch.Generator().manual_seed(calculate_seed)
else:
try:
generator = torch.Generator("cuda").manual_seed(calculate_seed)
except:
logger.warning("Generator in CPU")
generator = torch.Generator().manual_seed(calculate_seed)
generators.append(generator)
# fix img2img bug need concat tensor prompts with generator same number (only in batch inference)
pipe_params_config["generator"] = generators if self.task_name != "img2img" else generators[0] # no list
seeds = seeds if self.task_name != "img2img" else [seeds[0]] * num_images
try:
if self.class_name == "StableDiffusionXLPipeline":
# sdxl pipe
images = self.pipe(
prompt_embeds=conditioning[0:1],
pooled_prompt_embeds=pooled[0:1],
negative_prompt_embeds=conditioning[1:2],
negative_pooled_prompt_embeds=pooled[1:2],
#generator=pipe_params_config["generator"],
**pipe_params_config,
).images
if self.task_name not in ["txt2img", "inpaint", "img2img"]:
images = [control_image] + images
elif self.task_name == "txt2img":
images = self.run_pipe_SD(**pipe_params_config)
elif self.task_name == "inpaint":
images = self.run_pipe_inpaint(**pipe_params_config)
elif self.task_name not in ["txt2img", "inpaint", "img2img"]:
results = self.run_pipe(
**pipe_params_config
) ## pipe ControlNet add condition_weights
images = [control_image] + results
del results
elif self.task_name == "img2img":
images = self.run_pipe_img2img(**pipe_params_config)
except Exception as e:
e = str(e)
if "Tensor with 2 elements cannot be converted to Scalar" in e:
logger.debug(e)
logger.error("Error in sampler; trying with DDIM sampler")
self.pipe.scheduler = self.default_scheduler
self.pipe.scheduler = DDIMScheduler.from_config(self.pipe.scheduler.config)
if self.class_name == "StableDiffusionXLPipeline":
# sdxl pipe
images = self.pipe(
prompt_embeds=conditioning[0:1],
pooled_prompt_embeds=pooled[0:1],
negative_prompt_embeds=conditioning[1:2],
negative_pooled_prompt_embeds=pooled[1:2],
#generator=pipe_params_config["generator"],
**pipe_params_config,
).images
if self.task_name not in ["txt2img", "inpaint", "img2img"]:
images = [control_image] + images
elif self.task_name == "txt2img":
images = self.run_pipe_SD(**pipe_params_config)
elif self.task_name == "inpaint":
images = self.run_pipe_inpaint(**pipe_params_config)
elif self.task_name not in ["txt2img", "inpaint", "img2img"]:
results = self.run_pipe(
**pipe_params_config
) ## pipe ControlNet add condition_weights
images = [control_image] + results
del results
elif self.task_name == "img2img":
images = self.run_pipe_img2img(**pipe_params_config)
elif "The size of tensor a (0) must match the size of tensor b (3) at non-singleton" in e:
raise ValueError(f"steps / strength too low for the model to produce a satisfactory response")
else:
raise ValueError(e)
torch.cuda.empty_cache()
gc.collect()
if hires_before_adetailer and upscaler_model_path != None:
logger.debug(f"Hires before; same seed for each image (no batch)")
images = process_images_high_resolution(
images,
upscaler_model_path,
upscaler_increases_size,
esrgan_tile, esrgan_tile_overlap,
hires_steps, hires_params_config,
self.task_name,
generators[0], #pipe_params_config["generator"][0], # no generator
hires_pipe,
)
# Adetailer stuff
if adetailer_A or adetailer_B:
# image_pil_list = []
# for img_single in images:
# image_ad = img_single.convert("RGB")
# image_pil_list.append(image_ad)
if self.task_name not in ["txt2img", "inpaint", "img2img"]:
images = images[1:]
if adetailer_A:
images = ad_model_process(
pipe_params_df=detailfix_params_A,
detailfix_pipe=detailfix_pipe,
image_list_task=images,
**adetailer_A_params,
)
if adetailer_B:
images = ad_model_process(
pipe_params_df=detailfix_params_B,
detailfix_pipe=detailfix_pipe,
image_list_task=images,
**adetailer_B_params,
)
if self.task_name not in ["txt2img", "inpaint", "img2img"]:
images = [control_image] + images
# del detailfix_pipe
torch.cuda.empty_cache()
gc.collect()
if hires_after_adetailer and upscaler_model_path != None:
logger.debug(f"Hires after; same seed for each image (no batch)")
images = process_images_high_resolution(
images,
upscaler_model_path,
upscaler_increases_size,
esrgan_tile, esrgan_tile_overlap,
hires_steps, hires_params_config,
self.task_name,
generators[0], #pipe_params_config["generator"][0], # no generator
hires_pipe,
)
logger.info(f"Seeds: {seeds}")
# Show images if loop
if display_images:
mediapy.show_images(images)
# logger.info(image_list)
# del images
if loop_generation > 1:
time.sleep(0.5)
# List images and save
image_list = []
metadata = [
prompt,
negative_prompt,
self.base_model_id,
self.vae_model,
num_steps,
guidance_scale,
sampler,
0000000000, #calculate_seed,
img_width,
img_height,
clip_skip,
]
valid_seeds = [0] + seeds if self.task_name not in ["txt2img", "inpaint", "img2img"] else seeds
for image_, seed_ in zip(images, valid_seeds):
image_path = "not saved in storage"
if save_generated_images:
metadata[7] = seed_
image_path = save_pil_image_with_metadata(image_, image_storage_location, metadata)
image_list.append(image_path)
torch.cuda.empty_cache()
gc.collect()
if image_list[0] != "not saved in storage":
logger.info(image_list)
if hasattr(self, "compel") and not retain_compel_previous_load:
del self.compel
torch.cuda.empty_cache()
gc.collect()
return images, image_list