MasaCtrl / masactrl /diffuser_utils.py
ljzycmd
Add hugging face space demo.
5fc5efa
raw
history blame
11 kB
"""
Util functions based on Diffuser framework.
"""
import os
import torch
import cv2
import numpy as np
import torch.nn.functional as F
from tqdm import tqdm
from PIL import Image
from torchvision.utils import save_image
from torchvision.io import read_image
from diffusers import StableDiffusionPipeline
from pytorch_lightning import seed_everything
class MasaCtrlPipeline(StableDiffusionPipeline):
def next_step(
self,
model_output: torch.FloatTensor,
timestep: int,
x: torch.FloatTensor,
eta=0.,
verbose=False
):
"""
Inverse sampling for DDIM Inversion
"""
if verbose:
print("timestep: ", timestep)
next_step = timestep
timestep = min(timestep - self.scheduler.config.num_train_timesteps // self.scheduler.num_inference_steps, 999)
alpha_prod_t = self.scheduler.alphas_cumprod[timestep] if timestep >= 0 else self.scheduler.final_alpha_cumprod
alpha_prod_t_next = self.scheduler.alphas_cumprod[next_step]
beta_prod_t = 1 - alpha_prod_t
pred_x0 = (x - beta_prod_t**0.5 * model_output) / alpha_prod_t**0.5
pred_dir = (1 - alpha_prod_t_next)**0.5 * model_output
x_next = alpha_prod_t_next**0.5 * pred_x0 + pred_dir
return x_next, pred_x0
def step(
self,
model_output: torch.FloatTensor,
timestep: int,
x: torch.FloatTensor,
eta: float=0.0,
verbose=False,
):
"""
predict the sampe the next step in the denoise process.
"""
prev_timestep = timestep - self.scheduler.config.num_train_timesteps // self.scheduler.num_inference_steps
alpha_prod_t = self.scheduler.alphas_cumprod[timestep]
alpha_prod_t_prev = self.scheduler.alphas_cumprod[prev_timestep] if prev_timestep > 0 else self.scheduler.final_alpha_cumprod
beta_prod_t = 1 - alpha_prod_t
pred_x0 = (x - beta_prod_t**0.5 * model_output) / alpha_prod_t**0.5
pred_dir = (1 - alpha_prod_t_prev)**0.5 * model_output
x_prev = alpha_prod_t_prev**0.5 * pred_x0 + pred_dir
return x_prev, pred_x0
@torch.no_grad()
def image2latent(self, image):
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
if type(image) is Image:
image = np.array(image)
image = torch.from_numpy(image).float() / 127.5 - 1
image = image.permute(2, 0, 1).unsqueeze(0).to(DEVICE)
# input image density range [-1, 1]
latents = self.vae.encode(image)['latent_dist'].mean
latents = latents * 0.18215
return latents
@torch.no_grad()
def latent2image(self, latents, return_type='np'):
latents = 1 / 0.18215 * latents.detach()
image = self.vae.decode(latents)['sample']
if return_type == 'np':
image = (image / 2 + 0.5).clamp(0, 1)
image = image.cpu().permute(0, 2, 3, 1).numpy()[0]
image = (image * 255).astype(np.uint8)
elif return_type == "pt":
image = (image / 2 + 0.5).clamp(0, 1)
return image
def latent2image_grad(self, latents):
latents = 1 / 0.18215 * latents
image = self.vae.decode(latents)['sample']
return image # range [-1, 1]
@torch.no_grad()
def __call__(
self,
prompt,
batch_size=1,
height=512,
width=512,
num_inference_steps=50,
guidance_scale=7.5,
eta=0.0,
latents=None,
unconditioning=None,
neg_prompt=None,
ref_intermediate_latents=None,
return_intermediates=False,
**kwds):
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
if isinstance(prompt, list):
batch_size = len(prompt)
elif isinstance(prompt, str):
if batch_size > 1:
prompt = [prompt] * batch_size
# text embeddings
text_input = self.tokenizer(
prompt,
padding="max_length",
max_length=77,
return_tensors="pt"
)
text_embeddings = self.text_encoder(text_input.input_ids.to(DEVICE))[0]
print("input text embeddings :", text_embeddings.shape)
if kwds.get("dir"):
dir = text_embeddings[-2] - text_embeddings[-1]
u, s, v = torch.pca_lowrank(dir.transpose(-1, -2), q=1, center=True)
text_embeddings[-1] = text_embeddings[-1] + kwds.get("dir") * v
print(u.shape)
print(v.shape)
# define initial latents
latents_shape = (batch_size, self.unet.in_channels, height//8, width//8)
if latents is None:
latents = torch.randn(latents_shape, device=DEVICE)
else:
assert latents.shape == latents_shape, f"The shape of input latent tensor {latents.shape} should equal to predefined one."
# unconditional embedding for classifier free guidance
if guidance_scale > 1.:
max_length = text_input.input_ids.shape[-1]
if neg_prompt:
uc_text = neg_prompt
else:
uc_text = ""
# uc_text = "ugly, tiling, poorly drawn hands, poorly drawn feet, body out of frame, cut off, low contrast, underexposed, distorted face"
unconditional_input = self.tokenizer(
[uc_text] * batch_size,
padding="max_length",
max_length=77,
return_tensors="pt"
)
# unconditional_input.input_ids = unconditional_input.input_ids[:, 1:]
unconditional_embeddings = self.text_encoder(unconditional_input.input_ids.to(DEVICE))[0]
text_embeddings = torch.cat([unconditional_embeddings, text_embeddings], dim=0)
print("latents shape: ", latents.shape)
# iterative sampling
self.scheduler.set_timesteps(num_inference_steps)
# print("Valid timesteps: ", reversed(self.scheduler.timesteps))
latents_list = [latents]
pred_x0_list = [latents]
for i, t in enumerate(tqdm(self.scheduler.timesteps, desc="DDIM Sampler")):
if ref_intermediate_latents is not None:
# note that the batch_size >= 2
latents_ref = ref_intermediate_latents[-1 - i]
_, latents_cur = latents.chunk(2)
latents = torch.cat([latents_ref, latents_cur])
if guidance_scale > 1.:
model_inputs = torch.cat([latents] * 2)
else:
model_inputs = latents
if unconditioning is not None and isinstance(unconditioning, list):
_, text_embeddings = text_embeddings.chunk(2)
text_embeddings = torch.cat([unconditioning[i].expand(*text_embeddings.shape), text_embeddings])
# predict tghe noise
noise_pred = self.unet(model_inputs, t, encoder_hidden_states=text_embeddings).sample
if guidance_scale > 1.:
noise_pred_uncon, noise_pred_con = noise_pred.chunk(2, dim=0)
noise_pred = noise_pred_uncon + guidance_scale * (noise_pred_con - noise_pred_uncon)
# compute the previous noise sample x_t -> x_t-1
latents, pred_x0 = self.step(noise_pred, t, latents)
latents_list.append(latents)
pred_x0_list.append(pred_x0)
image = self.latent2image(latents, return_type="pt")
if return_intermediates:
pred_x0_list = [self.latent2image(img, return_type="pt") for img in pred_x0_list]
latents_list = [self.latent2image(img, return_type="pt") for img in latents_list]
return image, pred_x0_list, latents_list
return image
@torch.no_grad()
def invert(
self,
image: torch.Tensor,
prompt,
num_inference_steps=50,
guidance_scale=7.5,
eta=0.0,
return_intermediates=False,
**kwds):
"""
invert a real image into noise map with determinisc DDIM inversion
"""
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
batch_size = image.shape[0]
if isinstance(prompt, list):
if batch_size == 1:
image = image.expand(len(prompt), -1, -1, -1)
elif isinstance(prompt, str):
if batch_size > 1:
prompt = [prompt] * batch_size
# text embeddings
text_input = self.tokenizer(
prompt,
padding="max_length",
max_length=77,
return_tensors="pt"
)
text_embeddings = self.text_encoder(text_input.input_ids.to(DEVICE))[0]
print("input text embeddings :", text_embeddings.shape)
# define initial latents
latents = self.image2latent(image)
start_latents = latents
# print(latents)
# exit()
# unconditional embedding for classifier free guidance
if guidance_scale > 1.:
max_length = text_input.input_ids.shape[-1]
unconditional_input = self.tokenizer(
[""] * batch_size,
padding="max_length",
max_length=77,
return_tensors="pt"
)
unconditional_embeddings = self.text_encoder(unconditional_input.input_ids.to(DEVICE))[0]
text_embeddings = torch.cat([unconditional_embeddings, text_embeddings], dim=0)
print("latents shape: ", latents.shape)
# interative sampling
self.scheduler.set_timesteps(num_inference_steps)
print("Valid timesteps: ", reversed(self.scheduler.timesteps))
# print("attributes: ", self.scheduler.__dict__)
latents_list = [latents]
pred_x0_list = [latents]
for i, t in enumerate(tqdm(reversed(self.scheduler.timesteps), desc="DDIM Inversion")):
if guidance_scale > 1.:
model_inputs = torch.cat([latents] * 2)
else:
model_inputs = latents
# predict the noise
noise_pred = self.unet(model_inputs, t, encoder_hidden_states=text_embeddings).sample
if guidance_scale > 1.:
noise_pred_uncon, noise_pred_con = noise_pred.chunk(2, dim=0)
noise_pred = noise_pred_uncon + guidance_scale * (noise_pred_con - noise_pred_uncon)
# compute the previous noise sample x_t-1 -> x_t
latents, pred_x0 = self.next_step(noise_pred, t, latents)
latents_list.append(latents)
pred_x0_list.append(pred_x0)
if return_intermediates:
# return the intermediate laters during inversion
# pred_x0_list = [self.latent2image(img, return_type="pt") for img in pred_x0_list]
return latents, latents_list
return latents, start_latents