|
import inspect |
|
import warnings |
|
from typing import Optional, Tuple, Union |
|
|
|
import torch |
|
|
|
from ...models import UNet2DModel, VQModel |
|
from ...pipeline_utils import DiffusionPipeline, ImagePipelineOutput |
|
from ...schedulers import DDIMScheduler |
|
|
|
|
|
class LDMPipeline(DiffusionPipeline): |
|
r""" |
|
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the |
|
library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) |
|
|
|
Parameters: |
|
vqvae ([`VQModel`]): |
|
Vector-quantized (VQ) Model to encode and decode images to and from latent representations. |
|
unet ([`UNet2DModel`]): U-Net architecture to denoise the encoded image latents. |
|
scheduler ([`SchedulerMixin`]): |
|
[`DDIMScheduler`] is to be used in combination with `unet` to denoise the encoded image latens. |
|
""" |
|
|
|
def __init__(self, vqvae: VQModel, unet: UNet2DModel, scheduler: DDIMScheduler): |
|
super().__init__() |
|
scheduler = scheduler.set_format("pt") |
|
self.register_modules(vqvae=vqvae, unet=unet, scheduler=scheduler) |
|
|
|
@torch.no_grad() |
|
def __call__( |
|
self, |
|
batch_size: int = 1, |
|
generator: Optional[torch.Generator] = None, |
|
eta: float = 0.0, |
|
num_inference_steps: int = 50, |
|
output_type: Optional[str] = "pil", |
|
return_dict: bool = True, |
|
**kwargs, |
|
) -> Union[Tuple, ImagePipelineOutput]: |
|
|
|
r""" |
|
Args: |
|
batch_size (`int`, *optional*, defaults to 1): |
|
Number of images to generate. |
|
generator (`torch.Generator`, *optional*): |
|
A [torch generator](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make generation |
|
deterministic. |
|
num_inference_steps (`int`, *optional*, defaults to 50): |
|
The number of denoising steps. More denoising steps usually lead to a higher quality image at the |
|
expense of slower inference. |
|
output_type (`str`, *optional*, defaults to `"pil"`): |
|
The output format of the generate image. Choose between |
|
[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `nd.array`. |
|
return_dict (`bool`, *optional*, defaults to `True`): |
|
Whether or not to return a [`~pipeline_utils.ImagePipelineOutput`] instead of a plain tuple. |
|
|
|
Returns: |
|
[`~pipeline_utils.ImagePipelineOutput`] or `tuple`: [`~pipelines.utils.ImagePipelineOutput`] if |
|
`return_dict` is True, otherwise a `tuple. When returning a tuple, the first element is a list with the |
|
generated images. |
|
""" |
|
|
|
if "torch_device" in kwargs: |
|
device = kwargs.pop("torch_device") |
|
warnings.warn( |
|
"`torch_device` is deprecated as an input argument to `__call__` and will be removed in v0.3.0." |
|
" Consider using `pipe.to(torch_device)` instead." |
|
) |
|
|
|
|
|
if device is None: |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
self.to(device) |
|
|
|
latents = torch.randn( |
|
(batch_size, self.unet.in_channels, self.unet.sample_size, self.unet.sample_size), |
|
generator=generator, |
|
) |
|
latents = latents.to(self.device) |
|
|
|
self.scheduler.set_timesteps(num_inference_steps) |
|
|
|
|
|
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) |
|
|
|
extra_kwargs = {} |
|
if accepts_eta: |
|
extra_kwargs["eta"] = eta |
|
|
|
for t in self.progress_bar(self.scheduler.timesteps): |
|
|
|
noise_prediction = self.unet(latents, t).sample |
|
|
|
latents = self.scheduler.step(noise_prediction, t, latents, **extra_kwargs).prev_sample |
|
|
|
|
|
image = self.vqvae.decode(latents).sample |
|
|
|
image = (image / 2 + 0.5).clamp(0, 1) |
|
image = image.cpu().permute(0, 2, 3, 1).numpy() |
|
if output_type == "pil": |
|
image = self.numpy_to_pil(image) |
|
|
|
if not return_dict: |
|
return (image,) |
|
|
|
return ImagePipelineOutput(images=image) |
|
|