|
import inspect |
|
|
|
from pathlib import Path |
|
from tempfile import TemporaryDirectory |
|
from typing import List, Optional, Tuple, Union, Dict, Any, Callable, OrderedDict |
|
|
|
import numpy as np |
|
import openvino |
|
import torch |
|
|
|
from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput |
|
from optimum.intel.openvino.modeling_diffusion import OVStableDiffusionPipeline, OVModelUnet, OVModelVaeDecoder, OVModelTextEncoder, OVModelVaeEncoder, VaeImageProcessor |
|
from optimum.utils import ( |
|
DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER, |
|
DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER, |
|
DIFFUSION_MODEL_UNET_SUBFOLDER, |
|
DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER, |
|
DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER, |
|
) |
|
|
|
|
|
from diffusers import logging |
|
logger = logging.get_logger(__name__) |
|
|
|
class LCMOVModelUnet(OVModelUnet): |
|
def __call__( |
|
self, |
|
sample: np.ndarray, |
|
timestep: np.ndarray, |
|
encoder_hidden_states: np.ndarray, |
|
timestep_cond: Optional[np.ndarray] = None, |
|
text_embeds: Optional[np.ndarray] = None, |
|
time_ids: Optional[np.ndarray] = None, |
|
): |
|
self._compile() |
|
|
|
inputs = { |
|
"sample": sample, |
|
"timestep": timestep, |
|
"encoder_hidden_states": encoder_hidden_states, |
|
} |
|
|
|
if timestep_cond is not None: |
|
inputs["timestep_cond"] = timestep_cond |
|
if text_embeds is not None: |
|
inputs["text_embeds"] = text_embeds |
|
if time_ids is not None: |
|
inputs["time_ids"] = time_ids |
|
|
|
outputs = self.request(inputs, shared_memory=True) |
|
return list(outputs.values()) |
|
|
|
class OVLatentConsistencyModelPipeline(OVStableDiffusionPipeline): |
|
|
|
def __init__( |
|
self, |
|
vae_decoder: openvino.runtime.Model, |
|
text_encoder: openvino.runtime.Model, |
|
unet: openvino.runtime.Model, |
|
config: Dict[str, Any], |
|
tokenizer: "CLIPTokenizer", |
|
scheduler: Union["DDIMScheduler", "PNDMScheduler", "LMSDiscreteScheduler"], |
|
feature_extractor: Optional["CLIPFeatureExtractor"] = None, |
|
vae_encoder: Optional[openvino.runtime.Model] = None, |
|
text_encoder_2: Optional[openvino.runtime.Model] = None, |
|
tokenizer_2: Optional["CLIPTokenizer"] = None, |
|
device: str = "CPU", |
|
dynamic_shapes: bool = True, |
|
compile: bool = True, |
|
ov_config: Optional[Dict[str, str]] = None, |
|
model_save_dir: Optional[Union[str, Path, TemporaryDirectory]] = None, |
|
**kwargs, |
|
): |
|
self._internal_dict = config |
|
self._device = device.upper() |
|
self.is_dynamic = dynamic_shapes |
|
self.ov_config = ov_config if ov_config is not None else {} |
|
self._model_save_dir = ( |
|
Path(model_save_dir.name) if isinstance(model_save_dir, TemporaryDirectory) else model_save_dir |
|
) |
|
self.vae_decoder = OVModelVaeDecoder(vae_decoder, self) |
|
self.unet = LCMOVModelUnet(unet, self) |
|
self.text_encoder = OVModelTextEncoder(text_encoder, self) if text_encoder is not None else None |
|
self.text_encoder_2 = ( |
|
OVModelTextEncoder(text_encoder_2, self, model_name=DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER) |
|
if text_encoder_2 is not None |
|
else None |
|
) |
|
self.vae_encoder = OVModelVaeEncoder(vae_encoder, self) if vae_encoder is not None else None |
|
|
|
if "block_out_channels" in self.vae_decoder.config: |
|
self.vae_scale_factor = 2 ** (len(self.vae_decoder.config["block_out_channels"]) - 1) |
|
else: |
|
self.vae_scale_factor = 8 |
|
|
|
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor) |
|
|
|
self.tokenizer = tokenizer |
|
self.tokenizer_2 = tokenizer_2 |
|
self.scheduler = scheduler |
|
self.feature_extractor = feature_extractor |
|
self.safety_checker = None |
|
self.preprocessors = [] |
|
|
|
if self.is_dynamic: |
|
self.reshape(batch_size=-1, height=-1, width=-1, num_images_per_prompt=-1) |
|
|
|
if compile: |
|
self.compile() |
|
|
|
sub_models = { |
|
DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER: self.text_encoder, |
|
DIFFUSION_MODEL_UNET_SUBFOLDER: self.unet, |
|
DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER: self.vae_decoder, |
|
DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER: self.vae_encoder, |
|
DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER: self.text_encoder_2, |
|
} |
|
for name in sub_models.keys(): |
|
self._internal_dict[name] = ( |
|
("optimum", sub_models[name].__class__.__name__) if sub_models[name] is not None else (None, None) |
|
) |
|
|
|
self._internal_dict.pop("vae", None) |
|
|
|
def _reshape_unet( |
|
self, |
|
model: openvino.runtime.Model, |
|
batch_size: int = -1, |
|
height: int = -1, |
|
width: int = -1, |
|
num_images_per_prompt: int = -1, |
|
tokenizer_max_length: int = -1, |
|
): |
|
if batch_size == -1 or num_images_per_prompt == -1: |
|
batch_size = -1 |
|
else: |
|
batch_size = batch_size * num_images_per_prompt |
|
|
|
height = height // self.vae_scale_factor if height > 0 else height |
|
width = width // self.vae_scale_factor if width > 0 else width |
|
shapes = {} |
|
for inputs in model.inputs: |
|
shapes[inputs] = inputs.get_partial_shape() |
|
if inputs.get_any_name() == "timestep": |
|
shapes[inputs][0] = 1 |
|
elif inputs.get_any_name() == "sample": |
|
in_channels = self.unet.config.get("in_channels", None) |
|
if in_channels is None: |
|
in_channels = shapes[inputs][1] |
|
if in_channels.is_dynamic: |
|
logger.warning( |
|
"Could not identify `in_channels` from the unet configuration, to statically reshape the unet please provide a configuration." |
|
) |
|
self.is_dynamic = True |
|
|
|
shapes[inputs] = [batch_size, in_channels, height, width] |
|
elif inputs.get_any_name() == "timestep_cond": |
|
shapes[inputs] = [batch_size, inputs.get_partial_shape()[1]] |
|
elif inputs.get_any_name() == "text_embeds": |
|
shapes[inputs] = [batch_size, self.text_encoder_2.config["projection_dim"]] |
|
elif inputs.get_any_name() == "time_ids": |
|
shapes[inputs] = [batch_size, inputs.get_partial_shape()[1]] |
|
else: |
|
shapes[inputs][0] = batch_size |
|
shapes[inputs][1] = tokenizer_max_length |
|
model.reshape(shapes) |
|
return model |
|
|
|
def get_guidance_scale_embedding(self, w, embedding_dim=512, dtype=np.float32): |
|
""" |
|
see https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298 |
|
Args: |
|
timesteps: np.array: generate embedding vectors at these timesteps |
|
embedding_dim: int: dimension of the embeddings to generate |
|
dtype: data type of the generated embeddings |
|
|
|
Returns: |
|
embedding vectors with shape `(len(timesteps), embedding_dim)` |
|
""" |
|
assert len(w.shape) == 1 |
|
w = w * 1000. |
|
|
|
half_dim = embedding_dim // 2 |
|
emb = np.log(np.array(10000.)) / (half_dim - 1) |
|
emb = np.exp(np.arange(half_dim, dtype=dtype) * -emb) |
|
emb = w.astype(dtype)[:, None] * emb[None, :] |
|
emb = np.concatenate([np.sin(emb), np.cos(emb)], axis=1) |
|
if embedding_dim % 2 == 1: |
|
emb = np.pad(emb, (0, 1)) |
|
assert emb.shape == (w.shape[0], embedding_dim) |
|
return emb |
|
|
|
|
|
def __call__( |
|
self, |
|
prompt: Optional[Union[str, List[str]]] = None, |
|
height: Optional[int] = None, |
|
width: Optional[int] = None, |
|
num_inference_steps: int = 4, |
|
original_inference_steps: int = None, |
|
guidance_scale: float = 7.5, |
|
num_images_per_prompt: int = 1, |
|
eta: float = 0.0, |
|
generator: Optional[np.random.RandomState] = None, |
|
latents: Optional[np.ndarray] = None, |
|
prompt_embeds: Optional[np.ndarray] = None, |
|
output_type: str = "pil", |
|
return_dict: bool = True, |
|
callback: Optional[Callable[[int, int, np.ndarray], None]] = None, |
|
callback_steps: int = 1, |
|
guidance_rescale: float = 0.0, |
|
): |
|
r""" |
|
Function invoked when calling the pipeline for generation. |
|
|
|
Args: |
|
prompt (`Optional[Union[str, List[str]]]`, defaults to None): |
|
The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`. |
|
instead. |
|
height (`Optional[int]`, defaults to None): |
|
The height in pixels of the generated image. |
|
width (`Optional[int]`, defaults to None): |
|
The width in pixels of the generated image. |
|
num_inference_steps (`int`, defaults to 4): |
|
The number of denoising steps. More denoising steps usually lead to a higher quality image at the |
|
expense of slower inference. |
|
original_inference_steps (`int`, *optional*): |
|
The original number of inference steps use to generate a linearly-spaced timestep schedule, from which |
|
we will draw `num_inference_steps` evenly spaced timesteps from as our final timestep schedule, |
|
following the Skipping-Step method in the paper (see Section 4.3). If not set this will default to the |
|
scheduler's `original_inference_steps` attribute. |
|
guidance_scale (`float`, defaults to 7.5): |
|
Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). |
|
`guidance_scale` is defined as `w` of equation 2. of [Imagen |
|
Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > |
|
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, |
|
usually at the expense of lower image quality. |
|
num_images_per_prompt (`int`, defaults to 1): |
|
The number of images to generate per prompt. |
|
eta (`float`, defaults to 0.0): |
|
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to |
|
[`schedulers.DDIMScheduler`], will be ignored for others. |
|
generator (`Optional[np.random.RandomState]`, defaults to `None`):: |
|
A np.random.RandomState to make generation deterministic. |
|
latents (`Optional[np.ndarray]`, defaults to `None`): |
|
Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image |
|
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents |
|
tensor will ge generated by sampling using the supplied random `generator`. |
|
prompt_embeds (`Optional[np.ndarray]`, defaults to `None`): |
|
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not |
|
provided, text embeddings will be generated from `prompt` input argument. |
|
output_type (`str`, defaults to `"pil"`): |
|
The output format of the generate image. Choose between |
|
[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. |
|
return_dict (`bool`, defaults to `True`): |
|
Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a |
|
plain tuple. |
|
callback (Optional[Callable], defaults to `None`): |
|
A function that will be called every `callback_steps` steps during inference. The function will be |
|
called with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`. |
|
callback_steps (`int`, defaults to 1): |
|
The frequency at which the `callback` function will be called. If not specified, the callback will be |
|
called at every step. |
|
guidance_rescale (`float`, defaults to 0.0): |
|
Guidance rescale factor proposed by [Common Diffusion Noise Schedules and Sample Steps are |
|
Flawed](https://arxiv.org/pdf/2305.08891.pdf) `guidance_scale` is defined as `φ` in equation 16. of |
|
[Common Diffusion Noise Schedules and Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf). |
|
Guidance rescale factor should fix overexposure when using zero terminal SNR. |
|
|
|
Returns: |
|
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: |
|
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple. |
|
When returning a tuple, the first element is a list with the generated images, and the second element is a |
|
list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work" |
|
(nsfw) content, according to the `safety_checker`. |
|
""" |
|
height = height or self.unet.config.get("sample_size", 64) * self.vae_scale_factor |
|
width = width or self.unet.config.get("sample_size", 64) * self.vae_scale_factor |
|
|
|
|
|
self.check_inputs( |
|
prompt, height, width, callback_steps, None, prompt_embeds, None |
|
) |
|
|
|
|
|
if isinstance(prompt, str): |
|
batch_size = 1 |
|
elif isinstance(prompt, list): |
|
batch_size = len(prompt) |
|
else: |
|
batch_size = prompt_embeds.shape[0] |
|
|
|
if generator is None: |
|
generator = np.random |
|
|
|
|
|
torch_generator = torch.Generator().manual_seed(int(generator.get_state()[1][0])) |
|
|
|
|
|
|
|
|
|
|
|
|
|
prompt_embeds = self._encode_prompt( |
|
prompt, |
|
num_images_per_prompt, |
|
False, |
|
negative_prompt=None, |
|
prompt_embeds=prompt_embeds, |
|
negative_prompt_embeds=None, |
|
) |
|
|
|
|
|
self.scheduler.set_timesteps(num_inference_steps, "cpu", original_inference_steps=original_inference_steps) |
|
timesteps = self.scheduler.timesteps |
|
|
|
latents = self.prepare_latents( |
|
batch_size * num_images_per_prompt, |
|
self.unet.config.get("in_channels", 4), |
|
height, |
|
width, |
|
prompt_embeds.dtype, |
|
generator, |
|
latents, |
|
) |
|
|
|
|
|
w = np.tile(guidance_scale - 1, batch_size * num_images_per_prompt) |
|
w_embedding = self.get_guidance_scale_embedding(w, embedding_dim=self.unet.config.get("time_cond_proj_dim", 256)) |
|
|
|
|
|
timestep_dtype = self.unet.input_dtype.get("timestep", np.float32) |
|
|
|
|
|
|
|
|
|
|
|
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) |
|
extra_step_kwargs = {} |
|
if accepts_eta: |
|
extra_step_kwargs["eta"] = eta |
|
|
|
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys()) |
|
if accepts_generator: |
|
extra_step_kwargs["generator"] = torch_generator |
|
|
|
num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order |
|
for i, t in enumerate(self.progress_bar(timesteps)): |
|
|
|
|
|
timestep = np.array([t], dtype=timestep_dtype) |
|
|
|
noise_pred = self.unet(sample=latents, timestep=timestep, timestep_cond = w_embedding, encoder_hidden_states=prompt_embeds)[0] |
|
|
|
|
|
latents, denoised = self.scheduler.step( |
|
torch.from_numpy(noise_pred), t, torch.from_numpy(latents), **extra_step_kwargs, return_dict = False |
|
) |
|
|
|
latents, denoised = latents.numpy(), denoised.numpy() |
|
|
|
|
|
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): |
|
if callback is not None and i % callback_steps == 0: |
|
callback(i, t, latents) |
|
|
|
if output_type == "latent": |
|
image = latents |
|
has_nsfw_concept = None |
|
else: |
|
denoised /= self.vae_decoder.config.get("scaling_factor", 0.18215) |
|
|
|
image = np.concatenate( |
|
[self.vae_decoder(latent_sample=denoised[i : i + 1])[0] for i in range(latents.shape[0])] |
|
) |
|
image, has_nsfw_concept = self.run_safety_checker(image) |
|
|
|
if has_nsfw_concept is None: |
|
do_denormalize = [True] * image.shape[0] |
|
else: |
|
do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] |
|
|
|
image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) |
|
|
|
if not return_dict: |
|
return (image, has_nsfw_concept) |
|
|
|
return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) |
|
|